From 5009cf2826bef9a02a7c279a2cd46fd05423e5ab Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Sat, 27 May 2023 22:13:19 -0600 Subject: [PATCH] And deal with job termination --- .../tentacles/src/python/tentacles/store.py | 23 ++++++++++++++++++- .../tentacles/src/python/tentacles/workers.py | 13 +++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/projects/tentacles/src/python/tentacles/store.py b/projects/tentacles/src/python/tentacles/store.py index 305b54c..f67151a 100644 --- a/projects/tentacles/src/python/tentacles/store.py +++ b/projects/tentacles/src/python/tentacles/store.py @@ -359,7 +359,20 @@ class Store(object): """ SELECT * FROM jobs WHERE started_at IS NULL AND printer_id IS NOT NULL - ORDER BY priority DESC + """, + [], + ).fetchall() + + @requires_conn + def list_running_jobs(self): + """Scheduler detail. List running jobs.""" + + return self._conn.execute( + """ + SELECT * FROM jobs + WHERE started_at IS NOT NULL + AND printer_id IS NOT NULL + AND finished_at IS NULL """, [], ).fetchall() @@ -371,6 +384,7 @@ class Store(object): SELECT id FROM jobs WHERE started_at IS NULL + AND finished_at IS NULL AND printer_id IS NULL ORDER BY priority DESC LIMIT 1 @@ -395,6 +409,13 @@ class Store(object): "UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id] ) + @requires_conn + def finish_job(self, job_id: int, state: str): + return self._conn.execute( + "UPDATE jobs SET finished_at = datetime('now'), state = ?2 WHERE id = ?1", + [job_id, state], + ) + @requires_conn def delete_job(self, uid: int, jid: int): self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid]) diff --git a/projects/tentacles/src/python/tentacles/workers.py b/projects/tentacles/src/python/tentacles/workers.py index 715bcc1..243398a 100644 --- a/projects/tentacles/src/python/tentacles/workers.py +++ b/projects/tentacles/src/python/tentacles/workers.py @@ -124,8 +124,21 @@ def push_jobs(db_factory: Callable[[], Store]) -> None: log.exception("Oop") +@corn_job(timedelta(seconds=5)) +def pull_jobs(db_factory: Callable[[], Store]) -> None: + """Poll the state of mapped printers to control jobs.""" + + with closing(db_factory()) as db: + for job in db.list_running_jobs(): + printer = db.fetch_printer(job.printer_id) + if printer.status != "running": + db.finish_job(job.id, printer.status) + + def create_workers(db_factory: Callable[[], Store]) -> Event: Thread(target=poll_printers, args=[db_factory]).start() Thread(target=assign_jobs, args=[db_factory]).start() Thread(target=push_jobs, args=[db_factory]).start() + Thread(target=pull_jobs, args=[db_factory]).start() + return SHUTDOWN