And deal with job termination

This commit is contained in:
Reid 'arrdem' McKenzie 2023-05-27 22:13:19 -06:00
parent f873e6d36a
commit 5009cf2826
2 changed files with 35 additions and 1 deletions

View file

@ -359,7 +359,20 @@ class Store(object):
""" """
SELECT * FROM jobs SELECT * FROM jobs
WHERE started_at IS NULL AND printer_id IS NOT NULL WHERE started_at IS NULL AND printer_id IS NOT NULL
ORDER BY priority DESC """,
[],
).fetchall()
@requires_conn
def list_running_jobs(self):
"""Scheduler detail. List running jobs."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NOT NULL
AND printer_id IS NOT NULL
AND finished_at IS NULL
""", """,
[], [],
).fetchall() ).fetchall()
@ -371,6 +384,7 @@ class Store(object):
SELECT id SELECT id
FROM jobs FROM jobs
WHERE started_at IS NULL WHERE started_at IS NULL
AND finished_at IS NULL
AND printer_id IS NULL AND printer_id IS NULL
ORDER BY priority DESC ORDER BY priority DESC
LIMIT 1 LIMIT 1
@ -395,6 +409,13 @@ class Store(object):
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id] "UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
) )
@requires_conn
def finish_job(self, job_id: int, state: str):
return self._conn.execute(
"UPDATE jobs SET finished_at = datetime('now'), state = ?2 WHERE id = ?1",
[job_id, state],
)
@requires_conn @requires_conn
def delete_job(self, uid: int, jid: int): def delete_job(self, uid: int, jid: int):
self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid]) self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid])

View file

@ -124,8 +124,21 @@ def push_jobs(db_factory: Callable[[], Store]) -> None:
log.exception("Oop") log.exception("Oop")
@corn_job(timedelta(seconds=5))
def pull_jobs(db_factory: Callable[[], Store]) -> None:
"""Poll the state of mapped printers to control jobs."""
with closing(db_factory()) as db:
for job in db.list_running_jobs():
printer = db.fetch_printer(job.printer_id)
if printer.status != "running":
db.finish_job(job.id, printer.status)
def create_workers(db_factory: Callable[[], Store]) -> Event: def create_workers(db_factory: Callable[[], Store]) -> Event:
Thread(target=poll_printers, args=[db_factory]).start() Thread(target=poll_printers, args=[db_factory]).start()
Thread(target=assign_jobs, args=[db_factory]).start() Thread(target=assign_jobs, args=[db_factory]).start()
Thread(target=push_jobs, args=[db_factory]).start() Thread(target=push_jobs, args=[db_factory]).start()
Thread(target=pull_jobs, args=[db_factory]).start()
return SHUTDOWN return SHUTDOWN