Wiring up the printer control machinery
This commit is contained in:
parent
f1ea2cb645
commit
304d72cd83
8 changed files with 257 additions and 47 deletions
|
@ -7,6 +7,7 @@ py_project(
|
|||
py_requirement("click"),
|
||||
py_requirement("flask"),
|
||||
py_requirement("jinja2"),
|
||||
py_requirement("octorest"),
|
||||
],
|
||||
main_data = [
|
||||
"//projects/tentacles/src/python/tentacles/static/css",
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
|
||||
"""
|
||||
"""The core app entrypoint."""
|
||||
|
||||
from pathlib import Path
|
||||
import click
|
||||
from flask import Flask, request, current_app
|
||||
from flask import Flask, request
|
||||
import tomllib
|
||||
from datetime import datetime
|
||||
|
||||
from tentacles.blueprints import user_ui, printer_ui, api
|
||||
from tentacles.store import Store
|
||||
from tentacles.globals import _ctx, Ctx, ctx
|
||||
from tentacles.workers import create_workers
|
||||
|
||||
|
||||
@click.group()
|
||||
|
@ -20,10 +19,15 @@ def cli():
|
|||
pass
|
||||
|
||||
|
||||
def store_factory(app):
|
||||
store = Store(app.config.get("db", {}).get("uri"))
|
||||
store.connect()
|
||||
return store
|
||||
|
||||
|
||||
def custom_ctx(app, wsgi_app):
|
||||
def helper(environ, start_response):
|
||||
store = Store(app.config.get("db", {}).get("uri"))
|
||||
store.connect()
|
||||
store = store_factory(app)
|
||||
token = _ctx.set(Ctx(store))
|
||||
try:
|
||||
return wsgi_app(environ, start_response)
|
||||
|
@ -34,10 +38,10 @@ def custom_ctx(app, wsgi_app):
|
|||
return helper
|
||||
|
||||
|
||||
def create_j2_request_global():
|
||||
current_app.jinja_env.globals["ctx"] = ctx
|
||||
current_app.jinja_env.globals["request"] = request
|
||||
current_app.jinja_env.globals["datetime"] = datetime
|
||||
def create_j2_request_global(app):
|
||||
app.jinja_env.globals["ctx"] = ctx
|
||||
app.jinja_env.globals["request"] = request
|
||||
app.jinja_env.globals["datetime"] = datetime
|
||||
|
||||
|
||||
def user_session():
|
||||
|
@ -65,8 +69,8 @@ def serve(hostname: str, port: int, config: Path):
|
|||
print(app.config)
|
||||
|
||||
# Before first request
|
||||
with app.app_context():
|
||||
create_j2_request_global()
|
||||
create_j2_request_global(app)
|
||||
shutdown_event = create_workers(lambda: store_factory(app))
|
||||
|
||||
# Before request
|
||||
app.before_request(user_session)
|
||||
|
@ -80,7 +84,10 @@ def serve(hostname: str, port: int, config: Path):
|
|||
app.wsgi_app = custom_ctx(app, app.wsgi_app)
|
||||
|
||||
# And run the blame thing
|
||||
app.run(host=hostname, port=port)
|
||||
try:
|
||||
app.run(host=hostname, port=port)
|
||||
finally:
|
||||
shutdown_event.set()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -14,9 +14,9 @@ CREATE TABLE IF NOT EXISTS user_statuses (
|
|||
, UNIQUE(name)
|
||||
);
|
||||
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) values (-1, 'disabled');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) values (-2, 'unverified');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) values (1, 'enabled');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unverified');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled');
|
||||
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT
|
||||
|
|
|
@ -135,13 +135,14 @@ $secondary_dark_grey: #9A9A9A;
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// A timer animation
|
||||
|
||||
.timer {
|
||||
background: -webkit-linear-gradient(left, skyBlue 50%, #eee 50%);
|
||||
border-radius: 100%;
|
||||
height: calc(var(--size) * 1px);
|
||||
width: calc(var(--size) * 1px);
|
||||
position: relative;
|
||||
-webkit-animation: time calc(var(--duration) * 1s) steps(1000, start) infinite;
|
||||
-webkit-animation: time calc(var(--duration) * 1s) steps(1000, start);
|
||||
-webkit-mask: radial-gradient(transparent 50%,#000 50%);
|
||||
mask: radial-gradient(transparent 50%,#000 50%);
|
||||
}
|
||||
|
@ -152,7 +153,7 @@ $secondary_dark_grey: #9A9A9A;
|
|||
position: absolute;
|
||||
top: 0;
|
||||
width: 50%;
|
||||
-webkit-animation: mask calc(var(--duration) * 1s) steps(500, start) infinite;
|
||||
-webkit-animation: mask calc(var(--duration) * 1s) steps(500, start);
|
||||
-webkit-transform-origin: 100% 50%;
|
||||
}
|
||||
@-webkit-keyframes time {
|
||||
|
@ -178,3 +179,10 @@ $secondary_dark_grey: #9A9A9A;
|
|||
-webkit-transform: rotate(-180deg);
|
||||
}
|
||||
}
|
||||
|
||||
.alert .timer {
|
||||
--size: 10;
|
||||
--duration: 5;
|
||||
padding: 6px;
|
||||
margin: 6px;
|
||||
}
|
||||
|
|
|
@ -242,15 +242,53 @@ class Store(object):
|
|||
[name, url, api_key],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def fetch_printer(self, printer_id: int):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
p.id
|
||||
, p.name
|
||||
, p.url
|
||||
, p.api_key
|
||||
, p.last_poll_date
|
||||
, s.name as status
|
||||
FROM printers p
|
||||
INNER JOIN printer_statuses s
|
||||
ON p.status_id = s.id
|
||||
WHERE p.id = ?1
|
||||
""",
|
||||
[printer_id],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def list_printers(self):
|
||||
return self._conn.execute(
|
||||
"SELECT p.id, p.name, p.url, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
|
||||
"SELECT p.id, p.name, p.url, p.api_key, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
|
||||
).fetchall()
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def list_idle_printers(self):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT p.id
|
||||
FROM printers p
|
||||
LEFT JOIN jobs j ON p.id = j.printer_id
|
||||
WHERE j.id IS NULL
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def update_printer_status(self, printer_id, status_id):
|
||||
pass
|
||||
def update_printer_status(self, printer_id, state: str):
|
||||
(status_id,) = self._conn.execute(
|
||||
"SELECT id FROM printer_statuses WHERE name = ?1", [state]
|
||||
).fetchone()
|
||||
|
||||
self._conn.execute(
|
||||
"UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1",
|
||||
[printer_id, status_id],
|
||||
)
|
||||
|
||||
################################################################################
|
||||
# Files
|
||||
|
@ -274,6 +312,10 @@ class Store(object):
|
|||
def delete_file(self, uid: int, fid: int):
|
||||
self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid])
|
||||
|
||||
@requires_conn
|
||||
def fetch_file(self, fid: int):
|
||||
return self._conn.execute("SELECT * FROM files WHERE id = ?", [fid]).fetchone()
|
||||
|
||||
################################################################################
|
||||
# Job
|
||||
#
|
||||
|
@ -301,10 +343,40 @@ class Store(object):
|
|||
"""Enumerate jobs in priority order."""
|
||||
cond = f"AND user_id = {uid}" if uid else ""
|
||||
return self._conn.execute(
|
||||
f"SELECT * FROM jobs WHERE started_at IS NULL {cond} ORDER BY priority DESC",
|
||||
f"""
|
||||
SELECT * FROM jobs
|
||||
WHERE started_at IS NULL AND printer_id IS NULL {cond}
|
||||
ORDER BY priority DESC
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_mapped_jobs(self):
|
||||
"""Scheduler detail. List mapped but not started jobs."""
|
||||
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT * FROM jobs
|
||||
WHERE started_at IS NULL AND printer_id IS NOT NULL
|
||||
ORDER BY priority DESC
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def poll_job_queue(self):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT id
|
||||
FROM jobs
|
||||
WHERE started_at IS NULL
|
||||
AND printer_id IS NULL
|
||||
ORDER BY priority DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def fetch_job(self, uid: int, jid: int) -> Optional[tuple]:
|
||||
return self._conn.execute(
|
||||
|
@ -312,21 +384,15 @@ class Store(object):
|
|||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def alter_job(self, job: tuple):
|
||||
fields = [
|
||||
"id",
|
||||
"user_id",
|
||||
"file_id",
|
||||
"priority",
|
||||
"started_at",
|
||||
"finished_at",
|
||||
"state",
|
||||
"printer_id",
|
||||
]
|
||||
assert len(job) == len(fields)
|
||||
def assign_job(self, job_id: int, printer_id: int):
|
||||
return self._conn.execute(
|
||||
f"INSERT OR REPLACE INTO jobs ({', '.join(fields)}) VALUES ({', '.join(['?'] * len(fields))})",
|
||||
job,
|
||||
"UPDATE jobs SET printer_id = ?2 WHERE id = ?1", [job_id, printer_id]
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def start_job(self, job_id: int):
|
||||
return self._conn.execute(
|
||||
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
|
|
|
@ -44,9 +44,9 @@
|
|||
<div class="flashes">
|
||||
{% for category, message in messages %}
|
||||
<div class="alert {{ category }}">
|
||||
<input type="checkbox" id="alert{{ loop.index }}"/>
|
||||
<input type="checkbox" id="alert{{ loop.index }}" />
|
||||
<label class="close" title="close" for="alert{{loop.index}}">
|
||||
<div class="paused-timer" style="--duration: 2; --size: 10;">
|
||||
<div class="timer">
|
||||
<div class="mask">
|
||||
</div>
|
||||
</div>
|
||||
|
@ -68,15 +68,12 @@
|
|||
</body>
|
||||
<footer>
|
||||
<script type="text/javascript">
|
||||
document.querySelectorAll('.alert input[type=checkbox]').forEach(function($el) {
|
||||
console.log("Setting timeout on", $el);
|
||||
document.querySelectorAll(`.alert label[for=${$el.id}] .paused-timer`).forEach(function($it) {
|
||||
$it.setAttribute("class", "timer");
|
||||
document.querySelectorAll('.alert input[type=checkbox]').forEach(($el) => {
|
||||
document.querySelectorAll(`.alert label[for=${$el.id}] .timer`).forEach(($it) => {
|
||||
$it.addEventListener("animationend", () => {
|
||||
$el.click();
|
||||
}, {once: true});
|
||||
});
|
||||
setTimeout(() => {
|
||||
console.log("Timeout firing...")
|
||||
$el.click();
|
||||
}, 2000);
|
||||
});
|
||||
</script>
|
||||
</footer>
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
{% if printers %}
|
||||
<ul>
|
||||
{% for printer in printers %}
|
||||
{% with id, name, url, last_poll, status = printer %}
|
||||
{% with id, name, url, _api_key, last_poll, status = printer %}
|
||||
<li class="printer row">
|
||||
<span class="printer-name">{{name}}</span>
|
||||
<span class="printer-url"><code>{{url}}</code></span>
|
||||
|
|
131
projects/tentacles/src/python/tentacles/workers.py
Normal file
131
projects/tentacles/src/python/tentacles/workers.py
Normal file
|
@ -0,0 +1,131 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
"""Corn jobs 🌽
|
||||
|
||||
Supporting the core app with asynchronous maintenance tasks.
|
||||
|
||||
Mostly related to monitoring and managing Printer state.
|
||||
"""
|
||||
|
||||
from time import sleep
|
||||
from threading import Thread, Event
|
||||
from typing import Callable
|
||||
from datetime import timedelta, datetime
|
||||
import logging
|
||||
from contextlib import closing
|
||||
from urllib import parse as urlparse
|
||||
from tentacles.store import Store
|
||||
|
||||
from octorest import OctoRest as _OR
|
||||
from requests import Response
|
||||
from requests.exceptions import HTTPError, ConnectionError, Timeout
|
||||
|
||||
|
||||
class OctoRest(_OR):
|
||||
def _get(self, path, params=None):
|
||||
url = urlparse.urljoin(self.url, path)
|
||||
response = self.session.get(url, params=params, timeout=(0.1, 1.0))
|
||||
self._check_response(response)
|
||||
|
||||
return response.json()
|
||||
|
||||
def _check_response(self, response: Response):
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
SHUTDOWN = Event()
|
||||
|
||||
|
||||
def corn_job(every: timedelta):
|
||||
def _decorator(f):
|
||||
def _helper(*args, **kwargs):
|
||||
last = None
|
||||
while not SHUTDOWN.is_set():
|
||||
if not last or (datetime.now() - last) > every:
|
||||
log.debug(f"Ticking job {f.__name__}")
|
||||
try:
|
||||
last = datetime.now()
|
||||
f(*args, **kwargs)
|
||||
except Exception:
|
||||
log.exception(f"Error while procesing task {f.__name__}")
|
||||
else:
|
||||
sleep(1)
|
||||
|
||||
return _helper
|
||||
|
||||
return _decorator
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def poll_printers(db_factory: Callable[[], Store]) -> None:
|
||||
"""Poll printers for their status."""
|
||||
|
||||
with closing(db_factory()) as db:
|
||||
for printer in db.list_printers():
|
||||
id, name, url, api_key, last_poll, status = printer
|
||||
try:
|
||||
client = OctoRest(url=url, apikey=api_key)
|
||||
job = client.job_info()
|
||||
state = client.printer().get("state").get("flags", {})
|
||||
if state.get("error"):
|
||||
db.update_printer_status(id, "error")
|
||||
elif state.get("ready"):
|
||||
db.update_printer_status(id, "idle")
|
||||
elif state.get("printing"):
|
||||
db.update_printer_status(id, "running")
|
||||
else:
|
||||
raise Exception(f"Indeterminate state {state!r}")
|
||||
|
||||
except (ConnectionError, Timeout):
|
||||
db.update_printer_status(id, "error")
|
||||
|
||||
except HTTPError as e:
|
||||
assert isinstance(e.response, Response)
|
||||
if e.response.status_code in [403, 401] or "error" in job:
|
||||
db.update_printer_status(id, "error")
|
||||
elif e.response.json().get("error") == "Printer is not operational":
|
||||
db.update_printer_status(id, "disconnected")
|
||||
else:
|
||||
raise Exception(
|
||||
f"Indeterminate state {e.response.status_code}, {e.response.json()!r}"
|
||||
)
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def assign_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Assign jobs to printers. Uploading files and job state management is handled separately."""
|
||||
|
||||
with closing(db_factory()) as db:
|
||||
for printer_id in db.list_idle_printers():
|
||||
printer = db.fetch_printer(printer_id)
|
||||
if printer.status != "idle":
|
||||
continue
|
||||
|
||||
if next_job_id := db.poll_job_queue():
|
||||
db.assign_job(next_job_id, printer_id)
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def push_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Ensure that job files are uploaded and started to the assigned printer."""
|
||||
|
||||
with closing(db_factory()) as db:
|
||||
for job in db.list_mapped_jobs():
|
||||
printer = db.fetch_printer(job.printer_id)
|
||||
file = db.fetch_file(job.file_id)
|
||||
try:
|
||||
client = OctoRest(printer.url, printer.api_key)
|
||||
client.upload(file.filename, select=True, print=True)
|
||||
db.start_job(job.id)
|
||||
except Exception:
|
||||
log.exception("Oop")
|
||||
|
||||
|
||||
def create_workers(db_factory: Callable[[], Store]) -> Event:
|
||||
Thread(target=poll_printers, args=[db_factory]).start()
|
||||
Thread(target=assign_jobs, args=[db_factory]).start()
|
||||
Thread(target=push_jobs, args=[db_factory]).start()
|
||||
return SHUTDOWN
|
Loading…
Reference in a new issue