And deal with job termination
This commit is contained in:
parent
304d72cd83
commit
77143fb0d6
2 changed files with 35 additions and 1 deletions
|
@ -359,7 +359,20 @@ class Store(object):
|
||||||
"""
|
"""
|
||||||
SELECT * FROM jobs
|
SELECT * FROM jobs
|
||||||
WHERE started_at IS NULL AND printer_id IS NOT NULL
|
WHERE started_at IS NULL AND printer_id IS NOT NULL
|
||||||
ORDER BY priority DESC
|
""",
|
||||||
|
[],
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
@requires_conn
|
||||||
|
def list_running_jobs(self):
|
||||||
|
"""Scheduler detail. List running jobs."""
|
||||||
|
|
||||||
|
return self._conn.execute(
|
||||||
|
"""
|
||||||
|
SELECT * FROM jobs
|
||||||
|
WHERE started_at IS NOT NULL
|
||||||
|
AND printer_id IS NOT NULL
|
||||||
|
AND finished_at IS NULL
|
||||||
""",
|
""",
|
||||||
[],
|
[],
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
@ -371,6 +384,7 @@ class Store(object):
|
||||||
SELECT id
|
SELECT id
|
||||||
FROM jobs
|
FROM jobs
|
||||||
WHERE started_at IS NULL
|
WHERE started_at IS NULL
|
||||||
|
AND finished_at IS NULL
|
||||||
AND printer_id IS NULL
|
AND printer_id IS NULL
|
||||||
ORDER BY priority DESC
|
ORDER BY priority DESC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
|
@ -395,6 +409,13 @@ class Store(object):
|
||||||
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
|
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@requires_conn
|
||||||
|
def finish_job(self, job_id: int, state: str):
|
||||||
|
return self._conn.execute(
|
||||||
|
"UPDATE jobs SET finished_at = datetime('now'), state = ?2 WHERE id = ?1",
|
||||||
|
[job_id, state],
|
||||||
|
)
|
||||||
|
|
||||||
@requires_conn
|
@requires_conn
|
||||||
def delete_job(self, uid: int, jid: int):
|
def delete_job(self, uid: int, jid: int):
|
||||||
self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid])
|
self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid])
|
||||||
|
|
|
@ -124,8 +124,21 @@ def push_jobs(db_factory: Callable[[], Store]) -> None:
|
||||||
log.exception("Oop")
|
log.exception("Oop")
|
||||||
|
|
||||||
|
|
||||||
|
@corn_job(timedelta(seconds=5))
|
||||||
|
def pull_jobs(db_factory: Callable[[], Store]) -> None:
|
||||||
|
"""Poll the state of mapped printers to control jobs."""
|
||||||
|
|
||||||
|
with closing(db_factory()) as db:
|
||||||
|
for job in db.list_running_jobs():
|
||||||
|
printer = db.fetch_printer(job.printer_id)
|
||||||
|
if printer.status != "running":
|
||||||
|
db.finish_job(job.id, printer.status)
|
||||||
|
|
||||||
|
|
||||||
def create_workers(db_factory: Callable[[], Store]) -> Event:
|
def create_workers(db_factory: Callable[[], Store]) -> Event:
|
||||||
Thread(target=poll_printers, args=[db_factory]).start()
|
Thread(target=poll_printers, args=[db_factory]).start()
|
||||||
Thread(target=assign_jobs, args=[db_factory]).start()
|
Thread(target=assign_jobs, args=[db_factory]).start()
|
||||||
Thread(target=push_jobs, args=[db_factory]).start()
|
Thread(target=push_jobs, args=[db_factory]).start()
|
||||||
|
Thread(target=pull_jobs, args=[db_factory]).start()
|
||||||
|
|
||||||
return SHUTDOWN
|
return SHUTDOWN
|
||||||
|
|
Loading…
Reference in a new issue