Compare commits

..

2 commits

8 changed files with 291 additions and 47 deletions

View file

@ -7,6 +7,7 @@ py_project(
py_requirement("click"),
py_requirement("flask"),
py_requirement("jinja2"),
py_requirement("octorest"),
],
main_data = [
"//projects/tentacles/src/python/tentacles/static/css",

View file

@ -1,18 +1,17 @@
#!/usr/bin/env python3
"""
"""
"""The core app entrypoint."""
from pathlib import Path
import click
from flask import Flask, request, current_app
from flask import Flask, request
import tomllib
from datetime import datetime
from tentacles.blueprints import user_ui, printer_ui, api
from tentacles.store import Store
from tentacles.globals import _ctx, Ctx, ctx
from tentacles.workers import create_workers
@click.group()
@ -20,10 +19,15 @@ def cli():
pass
def store_factory(app):
store = Store(app.config.get("db", {}).get("uri"))
store.connect()
return store
def custom_ctx(app, wsgi_app):
def helper(environ, start_response):
store = Store(app.config.get("db", {}).get("uri"))
store.connect()
store = store_factory(app)
token = _ctx.set(Ctx(store))
try:
return wsgi_app(environ, start_response)
@ -34,10 +38,10 @@ def custom_ctx(app, wsgi_app):
return helper
def create_j2_request_global():
current_app.jinja_env.globals["ctx"] = ctx
current_app.jinja_env.globals["request"] = request
current_app.jinja_env.globals["datetime"] = datetime
def create_j2_request_global(app):
app.jinja_env.globals["ctx"] = ctx
app.jinja_env.globals["request"] = request
app.jinja_env.globals["datetime"] = datetime
def user_session():
@ -65,8 +69,8 @@ def serve(hostname: str, port: int, config: Path):
print(app.config)
# Before first request
with app.app_context():
create_j2_request_global()
create_j2_request_global(app)
shutdown_event = create_workers(lambda: store_factory(app))
# Before request
app.before_request(user_session)
@ -80,7 +84,10 @@ def serve(hostname: str, port: int, config: Path):
app.wsgi_app = custom_ctx(app, app.wsgi_app)
# And run the blame thing
app.run(host=hostname, port=port)
try:
app.run(host=hostname, port=port)
finally:
shutdown_event.set()
if __name__ == "__main__":

View file

@ -14,9 +14,9 @@ CREATE TABLE IF NOT EXISTS user_statuses (
, UNIQUE(name)
);
INSERT OR IGNORE INTO user_statuses (id, name) values (-1, 'disabled');
INSERT OR IGNORE INTO user_statuses (id, name) values (-2, 'unverified');
INSERT OR IGNORE INTO user_statuses (id, name) values (1, 'enabled');
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled');
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unverified');
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled');
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT

View file

@ -135,13 +135,14 @@ $secondary_dark_grey: #9A9A9A;
////////////////////////////////////////////////////////////////////////////////////////////////////
// A timer animation
.timer {
background: -webkit-linear-gradient(left, skyBlue 50%, #eee 50%);
border-radius: 100%;
height: calc(var(--size) * 1px);
width: calc(var(--size) * 1px);
position: relative;
-webkit-animation: time calc(var(--duration) * 1s) steps(1000, start) infinite;
-webkit-animation: time calc(var(--duration) * 1s) steps(1000, start);
-webkit-mask: radial-gradient(transparent 50%,#000 50%);
mask: radial-gradient(transparent 50%,#000 50%);
}
@ -152,7 +153,7 @@ $secondary_dark_grey: #9A9A9A;
position: absolute;
top: 0;
width: 50%;
-webkit-animation: mask calc(var(--duration) * 1s) steps(500, start) infinite;
-webkit-animation: mask calc(var(--duration) * 1s) steps(500, start);
-webkit-transform-origin: 100% 50%;
}
@-webkit-keyframes time {
@ -178,3 +179,10 @@ $secondary_dark_grey: #9A9A9A;
-webkit-transform: rotate(-180deg);
}
}
.alert .timer {
--size: 10;
--duration: 5;
padding: 6px;
margin: 6px;
}

View file

@ -242,15 +242,53 @@ class Store(object):
[name, url, api_key],
).fetchone()
@requires_conn
def fetch_printer(self, printer_id: int):
return self._conn.execute(
"""
SELECT
p.id
, p.name
, p.url
, p.api_key
, p.last_poll_date
, s.name as status
FROM printers p
INNER JOIN printer_statuses s
ON p.status_id = s.id
WHERE p.id = ?1
""",
[printer_id],
).fetchone()
@requires_conn
def list_printers(self):
return self._conn.execute(
"SELECT p.id, p.name, p.url, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
"SELECT p.id, p.name, p.url, p.api_key, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
).fetchall()
@fmap(one)
@requires_conn
def list_idle_printers(self):
return self._conn.execute(
"""
SELECT p.id
FROM printers p
LEFT JOIN jobs j ON p.id = j.printer_id
WHERE j.id IS NULL
"""
).fetchall()
@requires_conn
def update_printer_status(self, printer_id, status_id):
pass
def update_printer_status(self, printer_id, state: str):
(status_id,) = self._conn.execute(
"SELECT id FROM printer_statuses WHERE name = ?1", [state]
).fetchone()
self._conn.execute(
"UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1",
[printer_id, status_id],
)
################################################################################
# Files
@ -274,6 +312,10 @@ class Store(object):
def delete_file(self, uid: int, fid: int):
self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid])
@requires_conn
def fetch_file(self, fid: int):
return self._conn.execute("SELECT * FROM files WHERE id = ?", [fid]).fetchone()
################################################################################
# Job
#
@ -301,10 +343,54 @@ class Store(object):
"""Enumerate jobs in priority order."""
cond = f"AND user_id = {uid}" if uid else ""
return self._conn.execute(
f"SELECT * FROM jobs WHERE started_at IS NULL {cond} ORDER BY priority DESC",
f"""
SELECT * FROM jobs
WHERE started_at IS NULL AND printer_id IS NULL {cond}
ORDER BY priority DESC
""",
[],
).fetchall()
@requires_conn
def list_mapped_jobs(self):
"""Scheduler detail. List mapped but not started jobs."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NULL AND printer_id IS NOT NULL
""",
[],
).fetchall()
@requires_conn
def list_running_jobs(self):
"""Scheduler detail. List running jobs."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NOT NULL
AND printer_id IS NOT NULL
AND finished_at IS NULL
""",
[],
).fetchall()
@requires_conn
def poll_job_queue(self):
return self._conn.execute(
"""
SELECT id
FROM jobs
WHERE started_at IS NULL
AND finished_at IS NULL
AND printer_id IS NULL
ORDER BY priority DESC
LIMIT 1
"""
).fetchone()
@requires_conn
def fetch_job(self, uid: int, jid: int) -> Optional[tuple]:
return self._conn.execute(
@ -312,21 +398,22 @@ class Store(object):
).fetchone()
@requires_conn
def alter_job(self, job: tuple):
fields = [
"id",
"user_id",
"file_id",
"priority",
"started_at",
"finished_at",
"state",
"printer_id",
]
assert len(job) == len(fields)
def assign_job(self, job_id: int, printer_id: int):
return self._conn.execute(
f"INSERT OR REPLACE INTO jobs ({', '.join(fields)}) VALUES ({', '.join(['?'] * len(fields))})",
job,
"UPDATE jobs SET printer_id = ?2 WHERE id = ?1", [job_id, printer_id]
)
@requires_conn
def start_job(self, job_id: int):
return self._conn.execute(
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
)
@requires_conn
def finish_job(self, job_id: int, state: str):
return self._conn.execute(
"UPDATE jobs SET finished_at = datetime('now'), state = ?2 WHERE id = ?1",
[job_id, state],
)
@requires_conn

View file

@ -44,9 +44,9 @@
<div class="flashes">
{% for category, message in messages %}
<div class="alert {{ category }}">
<input type="checkbox" id="alert{{ loop.index }}"/>
<input type="checkbox" id="alert{{ loop.index }}" />
<label class="close" title="close" for="alert{{loop.index}}">
<div class="paused-timer" style="--duration: 2; --size: 10;">
<div class="timer">
<div class="mask">
</div>
</div>
@ -68,15 +68,12 @@
</body>
<footer>
<script type="text/javascript">
document.querySelectorAll('.alert input[type=checkbox]').forEach(function($el) {
console.log("Setting timeout on", $el);
document.querySelectorAll(`.alert label[for=${$el.id}] .paused-timer`).forEach(function($it) {
$it.setAttribute("class", "timer");
document.querySelectorAll('.alert input[type=checkbox]').forEach(($el) => {
document.querySelectorAll(`.alert label[for=${$el.id}] .timer`).forEach(($it) => {
$it.addEventListener("animationend", () => {
$el.click();
}, {once: true});
});
setTimeout(() => {
console.log("Timeout firing...")
$el.click();
}, 2000);
});
</script>
</footer>

View file

@ -6,7 +6,7 @@
{% if printers %}
<ul>
{% for printer in printers %}
{% with id, name, url, last_poll, status = printer %}
{% with id, name, url, _api_key, last_poll, status = printer %}
<li class="printer row">
<span class="printer-name">{{name}}</span>
<span class="printer-url"><code>{{url}}</code></span>

View file

@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""Corn jobs 🌽
Supporting the core app with asynchronous maintenance tasks.
Mostly related to monitoring and managing Printer state.
"""
from time import sleep
from threading import Thread, Event
from typing import Callable
from datetime import timedelta, datetime
import logging
from contextlib import closing
from urllib import parse as urlparse
from tentacles.store import Store
from octorest import OctoRest as _OR
from requests import Response
from requests.exceptions import HTTPError, ConnectionError, Timeout
class OctoRest(_OR):
def _get(self, path, params=None):
url = urlparse.urljoin(self.url, path)
response = self.session.get(url, params=params, timeout=(0.1, 1.0))
self._check_response(response)
return response.json()
def _check_response(self, response: Response):
response.raise_for_status()
return response
log = logging.getLogger(__name__)
SHUTDOWN = Event()
def corn_job(every: timedelta):
def _decorator(f):
def _helper(*args, **kwargs):
last = None
while not SHUTDOWN.is_set():
if not last or (datetime.now() - last) > every:
log.debug(f"Ticking job {f.__name__}")
try:
last = datetime.now()
f(*args, **kwargs)
except Exception:
log.exception(f"Error while procesing task {f.__name__}")
else:
sleep(1)
return _helper
return _decorator
@corn_job(timedelta(seconds=5))
def poll_printers(db_factory: Callable[[], Store]) -> None:
"""Poll printers for their status."""
with closing(db_factory()) as db:
for printer in db.list_printers():
id, name, url, api_key, last_poll, status = printer
try:
client = OctoRest(url=url, apikey=api_key)
job = client.job_info()
state = client.printer().get("state").get("flags", {})
if state.get("error"):
db.update_printer_status(id, "error")
elif state.get("ready"):
db.update_printer_status(id, "idle")
elif state.get("printing"):
db.update_printer_status(id, "running")
else:
raise Exception(f"Indeterminate state {state!r}")
except (ConnectionError, Timeout):
db.update_printer_status(id, "error")
except HTTPError as e:
assert isinstance(e.response, Response)
if e.response.status_code in [403, 401] or "error" in job:
db.update_printer_status(id, "error")
elif e.response.json().get("error") == "Printer is not operational":
db.update_printer_status(id, "disconnected")
else:
raise Exception(
f"Indeterminate state {e.response.status_code}, {e.response.json()!r}"
)
@corn_job(timedelta(seconds=5))
def assign_jobs(db_factory: Callable[[], Store]) -> None:
"""Assign jobs to printers. Uploading files and job state management is handled separately."""
with closing(db_factory()) as db:
for printer_id in db.list_idle_printers():
printer = db.fetch_printer(printer_id)
if printer.status != "idle":
continue
if next_job_id := db.poll_job_queue():
db.assign_job(next_job_id, printer_id)
@corn_job(timedelta(seconds=5))
def push_jobs(db_factory: Callable[[], Store]) -> None:
"""Ensure that job files are uploaded and started to the assigned printer."""
with closing(db_factory()) as db:
for job in db.list_mapped_jobs():
printer = db.fetch_printer(job.printer_id)
file = db.fetch_file(job.file_id)
try:
client = OctoRest(printer.url, printer.api_key)
client.upload(file.filename, select=True, print=True)
db.start_job(job.id)
except Exception:
log.exception("Oop")
@corn_job(timedelta(seconds=5))
def pull_jobs(db_factory: Callable[[], Store]) -> None:
"""Poll the state of mapped printers to control jobs."""
with closing(db_factory()) as db:
for job in db.list_running_jobs():
printer = db.fetch_printer(job.printer_id)
if printer.status != "running":
db.finish_job(job.id, printer.status)
def create_workers(db_factory: Callable[[], Store]) -> Event:
Thread(target=poll_printers, args=[db_factory]).start()
Thread(target=assign_jobs, args=[db_factory]).start()
Thread(target=push_jobs, args=[db_factory]).start()
Thread(target=pull_jobs, args=[db_factory]).start()
return SHUTDOWN