diff --git a/projects/tentacles/BUILD b/projects/tentacles/BUILD index 88703b5..83b47df 100644 --- a/projects/tentacles/BUILD +++ b/projects/tentacles/BUILD @@ -7,6 +7,7 @@ py_project( py_requirement("click"), py_requirement("flask"), py_requirement("jinja2"), + py_requirement("octorest"), ], main_data = [ "//projects/tentacles/src/python/tentacles/static/css", diff --git a/projects/tentacles/src/python/tentacles/__main__.py b/projects/tentacles/src/python/tentacles/__main__.py index 7999a88..f334e63 100644 --- a/projects/tentacles/src/python/tentacles/__main__.py +++ b/projects/tentacles/src/python/tentacles/__main__.py @@ -1,18 +1,17 @@ #!/usr/bin/env python3 -""" - -""" +"""The core app entrypoint.""" from pathlib import Path import click -from flask import Flask, request, current_app +from flask import Flask, request import tomllib from datetime import datetime from tentacles.blueprints import user_ui, printer_ui, api from tentacles.store import Store from tentacles.globals import _ctx, Ctx, ctx +from tentacles.workers import create_workers @click.group() @@ -20,10 +19,15 @@ def cli(): pass +def store_factory(app): + store = Store(app.config.get("db", {}).get("uri")) + store.connect() + return store + + def custom_ctx(app, wsgi_app): def helper(environ, start_response): - store = Store(app.config.get("db", {}).get("uri")) - store.connect() + store = store_factory(app) token = _ctx.set(Ctx(store)) try: return wsgi_app(environ, start_response) @@ -34,10 +38,10 @@ def custom_ctx(app, wsgi_app): return helper -def create_j2_request_global(): - current_app.jinja_env.globals["ctx"] = ctx - current_app.jinja_env.globals["request"] = request - current_app.jinja_env.globals["datetime"] = datetime +def create_j2_request_global(app): + app.jinja_env.globals["ctx"] = ctx + app.jinja_env.globals["request"] = request + app.jinja_env.globals["datetime"] = datetime def user_session(): @@ -65,8 +69,8 @@ def serve(hostname: str, port: int, config: Path): print(app.config) # Before first request - with app.app_context(): - create_j2_request_global() + create_j2_request_global(app) + shutdown_event = create_workers(lambda: store_factory(app)) # Before request app.before_request(user_session) @@ -80,7 +84,10 @@ def serve(hostname: str, port: int, config: Path): app.wsgi_app = custom_ctx(app, app.wsgi_app) # And run the blame thing - app.run(host=hostname, port=port) + try: + app.run(host=hostname, port=port) + finally: + shutdown_event.set() if __name__ == "__main__": diff --git a/projects/tentacles/src/python/tentacles/schema.sql b/projects/tentacles/src/python/tentacles/schema.sql index a808341..b32e9a7 100644 --- a/projects/tentacles/src/python/tentacles/schema.sql +++ b/projects/tentacles/src/python/tentacles/schema.sql @@ -14,9 +14,9 @@ CREATE TABLE IF NOT EXISTS user_statuses ( , UNIQUE(name) ); -INSERT OR IGNORE INTO user_statuses (id, name) values (-1, 'disabled'); -INSERT OR IGNORE INTO user_statuses (id, name) values (-2, 'unverified'); -INSERT OR IGNORE INTO user_statuses (id, name) values (1, 'enabled'); +INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled'); +INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unverified'); +INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled'); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT diff --git a/projects/tentacles/src/python/tentacles/static/css/_tirefire.scss b/projects/tentacles/src/python/tentacles/static/css/_tirefire.scss index 6b69451..b596b4e 100644 --- a/projects/tentacles/src/python/tentacles/static/css/_tirefire.scss +++ b/projects/tentacles/src/python/tentacles/static/css/_tirefire.scss @@ -135,13 +135,14 @@ $secondary_dark_grey: #9A9A9A; //////////////////////////////////////////////////////////////////////////////////////////////////// // A timer animation + .timer { background: -webkit-linear-gradient(left, skyBlue 50%, #eee 50%); border-radius: 100%; height: calc(var(--size) * 1px); width: calc(var(--size) * 1px); position: relative; - -webkit-animation: time calc(var(--duration) * 1s) steps(1000, start) infinite; + -webkit-animation: time calc(var(--duration) * 1s) steps(1000, start); -webkit-mask: radial-gradient(transparent 50%,#000 50%); mask: radial-gradient(transparent 50%,#000 50%); } @@ -152,7 +153,7 @@ $secondary_dark_grey: #9A9A9A; position: absolute; top: 0; width: 50%; - -webkit-animation: mask calc(var(--duration) * 1s) steps(500, start) infinite; + -webkit-animation: mask calc(var(--duration) * 1s) steps(500, start); -webkit-transform-origin: 100% 50%; } @-webkit-keyframes time { @@ -178,3 +179,10 @@ $secondary_dark_grey: #9A9A9A; -webkit-transform: rotate(-180deg); } } + +.alert .timer { + --size: 10; + --duration: 5; + padding: 6px; + margin: 6px; +} diff --git a/projects/tentacles/src/python/tentacles/store.py b/projects/tentacles/src/python/tentacles/store.py index 5f75b54..305b54c 100644 --- a/projects/tentacles/src/python/tentacles/store.py +++ b/projects/tentacles/src/python/tentacles/store.py @@ -242,15 +242,53 @@ class Store(object): [name, url, api_key], ).fetchone() + @requires_conn + def fetch_printer(self, printer_id: int): + return self._conn.execute( + """ + SELECT + p.id + , p.name + , p.url + , p.api_key + , p.last_poll_date + , s.name as status + FROM printers p + INNER JOIN printer_statuses s + ON p.status_id = s.id + WHERE p.id = ?1 + """, + [printer_id], + ).fetchone() + @requires_conn def list_printers(self): return self._conn.execute( - "SELECT p.id, p.name, p.url, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id" + "SELECT p.id, p.name, p.url, p.api_key, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id" + ).fetchall() + + @fmap(one) + @requires_conn + def list_idle_printers(self): + return self._conn.execute( + """ + SELECT p.id + FROM printers p + LEFT JOIN jobs j ON p.id = j.printer_id + WHERE j.id IS NULL + """ ).fetchall() @requires_conn - def update_printer_status(self, printer_id, status_id): - pass + def update_printer_status(self, printer_id, state: str): + (status_id,) = self._conn.execute( + "SELECT id FROM printer_statuses WHERE name = ?1", [state] + ).fetchone() + + self._conn.execute( + "UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1", + [printer_id, status_id], + ) ################################################################################ # Files @@ -274,6 +312,10 @@ class Store(object): def delete_file(self, uid: int, fid: int): self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid]) + @requires_conn + def fetch_file(self, fid: int): + return self._conn.execute("SELECT * FROM files WHERE id = ?", [fid]).fetchone() + ################################################################################ # Job # @@ -301,10 +343,40 @@ class Store(object): """Enumerate jobs in priority order.""" cond = f"AND user_id = {uid}" if uid else "" return self._conn.execute( - f"SELECT * FROM jobs WHERE started_at IS NULL {cond} ORDER BY priority DESC", + f""" + SELECT * FROM jobs + WHERE started_at IS NULL AND printer_id IS NULL {cond} + ORDER BY priority DESC + """, [], ).fetchall() + @requires_conn + def list_mapped_jobs(self): + """Scheduler detail. List mapped but not started jobs.""" + + return self._conn.execute( + """ + SELECT * FROM jobs + WHERE started_at IS NULL AND printer_id IS NOT NULL + ORDER BY priority DESC + """, + [], + ).fetchall() + + @requires_conn + def poll_job_queue(self): + return self._conn.execute( + """ + SELECT id + FROM jobs + WHERE started_at IS NULL + AND printer_id IS NULL + ORDER BY priority DESC + LIMIT 1 + """ + ).fetchone() + @requires_conn def fetch_job(self, uid: int, jid: int) -> Optional[tuple]: return self._conn.execute( @@ -312,21 +384,15 @@ class Store(object): ).fetchone() @requires_conn - def alter_job(self, job: tuple): - fields = [ - "id", - "user_id", - "file_id", - "priority", - "started_at", - "finished_at", - "state", - "printer_id", - ] - assert len(job) == len(fields) + def assign_job(self, job_id: int, printer_id: int): return self._conn.execute( - f"INSERT OR REPLACE INTO jobs ({', '.join(fields)}) VALUES ({', '.join(['?'] * len(fields))})", - job, + "UPDATE jobs SET printer_id = ?2 WHERE id = ?1", [job_id, printer_id] + ) + + @requires_conn + def start_job(self, job_id: int): + return self._conn.execute( + "UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id] ) @requires_conn diff --git a/projects/tentacles/src/python/tentacles/templates/base.html.j2 b/projects/tentacles/src/python/tentacles/templates/base.html.j2 index 70962ee..197d8ee 100644 --- a/projects/tentacles/src/python/tentacles/templates/base.html.j2 +++ b/projects/tentacles/src/python/tentacles/templates/base.html.j2 @@ -44,9 +44,9 @@ <div class="flashes"> {% for category, message in messages %} <div class="alert {{ category }}"> - <input type="checkbox" id="alert{{ loop.index }}"/> + <input type="checkbox" id="alert{{ loop.index }}" /> <label class="close" title="close" for="alert{{loop.index}}"> - <div class="paused-timer" style="--duration: 2; --size: 10;"> + <div class="timer"> <div class="mask"> </div> </div> @@ -68,15 +68,12 @@ </body> <footer> <script type="text/javascript"> - document.querySelectorAll('.alert input[type=checkbox]').forEach(function($el) { - console.log("Setting timeout on", $el); - document.querySelectorAll(`.alert label[for=${$el.id}] .paused-timer`).forEach(function($it) { - $it.setAttribute("class", "timer"); + document.querySelectorAll('.alert input[type=checkbox]').forEach(($el) => { + document.querySelectorAll(`.alert label[for=${$el.id}] .timer`).forEach(($it) => { + $it.addEventListener("animationend", () => { + $el.click(); + }, {once: true}); }); - setTimeout(() => { - console.log("Timeout firing...") - $el.click(); - }, 2000); }); </script> </footer> diff --git a/projects/tentacles/src/python/tentacles/templates/printers.html.j2 b/projects/tentacles/src/python/tentacles/templates/printers.html.j2 index e4443d3..60a136f 100644 --- a/projects/tentacles/src/python/tentacles/templates/printers.html.j2 +++ b/projects/tentacles/src/python/tentacles/templates/printers.html.j2 @@ -6,7 +6,7 @@ {% if printers %} <ul> {% for printer in printers %} - {% with id, name, url, last_poll, status = printer %} + {% with id, name, url, _api_key, last_poll, status = printer %} <li class="printer row"> <span class="printer-name">{{name}}</span> <span class="printer-url"><code>{{url}}</code></span> diff --git a/projects/tentacles/src/python/tentacles/workers.py b/projects/tentacles/src/python/tentacles/workers.py new file mode 100644 index 0000000..715bcc1 --- /dev/null +++ b/projects/tentacles/src/python/tentacles/workers.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 + +"""Corn jobs 🌽 + +Supporting the core app with asynchronous maintenance tasks. + +Mostly related to monitoring and managing Printer state. +""" + +from time import sleep +from threading import Thread, Event +from typing import Callable +from datetime import timedelta, datetime +import logging +from contextlib import closing +from urllib import parse as urlparse +from tentacles.store import Store + +from octorest import OctoRest as _OR +from requests import Response +from requests.exceptions import HTTPError, ConnectionError, Timeout + + +class OctoRest(_OR): + def _get(self, path, params=None): + url = urlparse.urljoin(self.url, path) + response = self.session.get(url, params=params, timeout=(0.1, 1.0)) + self._check_response(response) + + return response.json() + + def _check_response(self, response: Response): + response.raise_for_status() + return response + + +log = logging.getLogger(__name__) + +SHUTDOWN = Event() + + +def corn_job(every: timedelta): + def _decorator(f): + def _helper(*args, **kwargs): + last = None + while not SHUTDOWN.is_set(): + if not last or (datetime.now() - last) > every: + log.debug(f"Ticking job {f.__name__}") + try: + last = datetime.now() + f(*args, **kwargs) + except Exception: + log.exception(f"Error while procesing task {f.__name__}") + else: + sleep(1) + + return _helper + + return _decorator + + +@corn_job(timedelta(seconds=5)) +def poll_printers(db_factory: Callable[[], Store]) -> None: + """Poll printers for their status.""" + + with closing(db_factory()) as db: + for printer in db.list_printers(): + id, name, url, api_key, last_poll, status = printer + try: + client = OctoRest(url=url, apikey=api_key) + job = client.job_info() + state = client.printer().get("state").get("flags", {}) + if state.get("error"): + db.update_printer_status(id, "error") + elif state.get("ready"): + db.update_printer_status(id, "idle") + elif state.get("printing"): + db.update_printer_status(id, "running") + else: + raise Exception(f"Indeterminate state {state!r}") + + except (ConnectionError, Timeout): + db.update_printer_status(id, "error") + + except HTTPError as e: + assert isinstance(e.response, Response) + if e.response.status_code in [403, 401] or "error" in job: + db.update_printer_status(id, "error") + elif e.response.json().get("error") == "Printer is not operational": + db.update_printer_status(id, "disconnected") + else: + raise Exception( + f"Indeterminate state {e.response.status_code}, {e.response.json()!r}" + ) + + +@corn_job(timedelta(seconds=5)) +def assign_jobs(db_factory: Callable[[], Store]) -> None: + """Assign jobs to printers. Uploading files and job state management is handled separately.""" + + with closing(db_factory()) as db: + for printer_id in db.list_idle_printers(): + printer = db.fetch_printer(printer_id) + if printer.status != "idle": + continue + + if next_job_id := db.poll_job_queue(): + db.assign_job(next_job_id, printer_id) + + +@corn_job(timedelta(seconds=5)) +def push_jobs(db_factory: Callable[[], Store]) -> None: + """Ensure that job files are uploaded and started to the assigned printer.""" + + with closing(db_factory()) as db: + for job in db.list_mapped_jobs(): + printer = db.fetch_printer(job.printer_id) + file = db.fetch_file(job.file_id) + try: + client = OctoRest(printer.url, printer.api_key) + client.upload(file.filename, select=True, print=True) + db.start_job(job.id) + except Exception: + log.exception("Oop") + + +def create_workers(db_factory: Callable[[], Store]) -> Event: + Thread(target=poll_printers, args=[db_factory]).start() + Thread(target=assign_jobs, args=[db_factory]).start() + Thread(target=push_jobs, args=[db_factory]).start() + return SHUTDOWN