Compare commits

...

2 commits

8 changed files with 291 additions and 47 deletions

View file

@ -7,6 +7,7 @@ py_project(
py_requirement("click"), py_requirement("click"),
py_requirement("flask"), py_requirement("flask"),
py_requirement("jinja2"), py_requirement("jinja2"),
py_requirement("octorest"),
], ],
main_data = [ main_data = [
"//projects/tentacles/src/python/tentacles/static/css", "//projects/tentacles/src/python/tentacles/static/css",

View file

@ -1,18 +1,17 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """The core app entrypoint."""
"""
from pathlib import Path from pathlib import Path
import click import click
from flask import Flask, request, current_app from flask import Flask, request
import tomllib import tomllib
from datetime import datetime from datetime import datetime
from tentacles.blueprints import user_ui, printer_ui, api from tentacles.blueprints import user_ui, printer_ui, api
from tentacles.store import Store from tentacles.store import Store
from tentacles.globals import _ctx, Ctx, ctx from tentacles.globals import _ctx, Ctx, ctx
from tentacles.workers import create_workers
@click.group() @click.group()
@ -20,10 +19,15 @@ def cli():
pass pass
def custom_ctx(app, wsgi_app): def store_factory(app):
def helper(environ, start_response):
store = Store(app.config.get("db", {}).get("uri")) store = Store(app.config.get("db", {}).get("uri"))
store.connect() store.connect()
return store
def custom_ctx(app, wsgi_app):
def helper(environ, start_response):
store = store_factory(app)
token = _ctx.set(Ctx(store)) token = _ctx.set(Ctx(store))
try: try:
return wsgi_app(environ, start_response) return wsgi_app(environ, start_response)
@ -34,10 +38,10 @@ def custom_ctx(app, wsgi_app):
return helper return helper
def create_j2_request_global(): def create_j2_request_global(app):
current_app.jinja_env.globals["ctx"] = ctx app.jinja_env.globals["ctx"] = ctx
current_app.jinja_env.globals["request"] = request app.jinja_env.globals["request"] = request
current_app.jinja_env.globals["datetime"] = datetime app.jinja_env.globals["datetime"] = datetime
def user_session(): def user_session():
@ -65,8 +69,8 @@ def serve(hostname: str, port: int, config: Path):
print(app.config) print(app.config)
# Before first request # Before first request
with app.app_context(): create_j2_request_global(app)
create_j2_request_global() shutdown_event = create_workers(lambda: store_factory(app))
# Before request # Before request
app.before_request(user_session) app.before_request(user_session)
@ -80,7 +84,10 @@ def serve(hostname: str, port: int, config: Path):
app.wsgi_app = custom_ctx(app, app.wsgi_app) app.wsgi_app = custom_ctx(app, app.wsgi_app)
# And run the blame thing # And run the blame thing
try:
app.run(host=hostname, port=port) app.run(host=hostname, port=port)
finally:
shutdown_event.set()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -14,9 +14,9 @@ CREATE TABLE IF NOT EXISTS user_statuses (
, UNIQUE(name) , UNIQUE(name)
); );
INSERT OR IGNORE INTO user_statuses (id, name) values (-1, 'disabled'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled');
INSERT OR IGNORE INTO user_statuses (id, name) values (-2, 'unverified'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unverified');
INSERT OR IGNORE INTO user_statuses (id, name) values (1, 'enabled'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled');
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT id INTEGER PRIMARY KEY AUTOINCREMENT

View file

@ -135,13 +135,14 @@ $secondary_dark_grey: #9A9A9A;
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
// A timer animation // A timer animation
.timer { .timer {
background: -webkit-linear-gradient(left, skyBlue 50%, #eee 50%); background: -webkit-linear-gradient(left, skyBlue 50%, #eee 50%);
border-radius: 100%; border-radius: 100%;
height: calc(var(--size) * 1px); height: calc(var(--size) * 1px);
width: calc(var(--size) * 1px); width: calc(var(--size) * 1px);
position: relative; position: relative;
-webkit-animation: time calc(var(--duration) * 1s) steps(1000, start) infinite; -webkit-animation: time calc(var(--duration) * 1s) steps(1000, start);
-webkit-mask: radial-gradient(transparent 50%,#000 50%); -webkit-mask: radial-gradient(transparent 50%,#000 50%);
mask: radial-gradient(transparent 50%,#000 50%); mask: radial-gradient(transparent 50%,#000 50%);
} }
@ -152,7 +153,7 @@ $secondary_dark_grey: #9A9A9A;
position: absolute; position: absolute;
top: 0; top: 0;
width: 50%; width: 50%;
-webkit-animation: mask calc(var(--duration) * 1s) steps(500, start) infinite; -webkit-animation: mask calc(var(--duration) * 1s) steps(500, start);
-webkit-transform-origin: 100% 50%; -webkit-transform-origin: 100% 50%;
} }
@-webkit-keyframes time { @-webkit-keyframes time {
@ -178,3 +179,10 @@ $secondary_dark_grey: #9A9A9A;
-webkit-transform: rotate(-180deg); -webkit-transform: rotate(-180deg);
} }
} }
.alert .timer {
--size: 10;
--duration: 5;
padding: 6px;
margin: 6px;
}

View file

@ -242,15 +242,53 @@ class Store(object):
[name, url, api_key], [name, url, api_key],
).fetchone() ).fetchone()
@requires_conn
def fetch_printer(self, printer_id: int):
return self._conn.execute(
"""
SELECT
p.id
, p.name
, p.url
, p.api_key
, p.last_poll_date
, s.name as status
FROM printers p
INNER JOIN printer_statuses s
ON p.status_id = s.id
WHERE p.id = ?1
""",
[printer_id],
).fetchone()
@requires_conn @requires_conn
def list_printers(self): def list_printers(self):
return self._conn.execute( return self._conn.execute(
"SELECT p.id, p.name, p.url, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id" "SELECT p.id, p.name, p.url, p.api_key, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
).fetchall()
@fmap(one)
@requires_conn
def list_idle_printers(self):
return self._conn.execute(
"""
SELECT p.id
FROM printers p
LEFT JOIN jobs j ON p.id = j.printer_id
WHERE j.id IS NULL
"""
).fetchall() ).fetchall()
@requires_conn @requires_conn
def update_printer_status(self, printer_id, status_id): def update_printer_status(self, printer_id, state: str):
pass (status_id,) = self._conn.execute(
"SELECT id FROM printer_statuses WHERE name = ?1", [state]
).fetchone()
self._conn.execute(
"UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1",
[printer_id, status_id],
)
################################################################################ ################################################################################
# Files # Files
@ -274,6 +312,10 @@ class Store(object):
def delete_file(self, uid: int, fid: int): def delete_file(self, uid: int, fid: int):
self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid]) self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid])
@requires_conn
def fetch_file(self, fid: int):
return self._conn.execute("SELECT * FROM files WHERE id = ?", [fid]).fetchone()
################################################################################ ################################################################################
# Job # Job
# #
@ -301,10 +343,54 @@ class Store(object):
"""Enumerate jobs in priority order.""" """Enumerate jobs in priority order."""
cond = f"AND user_id = {uid}" if uid else "" cond = f"AND user_id = {uid}" if uid else ""
return self._conn.execute( return self._conn.execute(
f"SELECT * FROM jobs WHERE started_at IS NULL {cond} ORDER BY priority DESC", f"""
SELECT * FROM jobs
WHERE started_at IS NULL AND printer_id IS NULL {cond}
ORDER BY priority DESC
""",
[], [],
).fetchall() ).fetchall()
@requires_conn
def list_mapped_jobs(self):
"""Scheduler detail. List mapped but not started jobs."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NULL AND printer_id IS NOT NULL
""",
[],
).fetchall()
@requires_conn
def list_running_jobs(self):
"""Scheduler detail. List running jobs."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NOT NULL
AND printer_id IS NOT NULL
AND finished_at IS NULL
""",
[],
).fetchall()
@requires_conn
def poll_job_queue(self):
return self._conn.execute(
"""
SELECT id
FROM jobs
WHERE started_at IS NULL
AND finished_at IS NULL
AND printer_id IS NULL
ORDER BY priority DESC
LIMIT 1
"""
).fetchone()
@requires_conn @requires_conn
def fetch_job(self, uid: int, jid: int) -> Optional[tuple]: def fetch_job(self, uid: int, jid: int) -> Optional[tuple]:
return self._conn.execute( return self._conn.execute(
@ -312,21 +398,22 @@ class Store(object):
).fetchone() ).fetchone()
@requires_conn @requires_conn
def alter_job(self, job: tuple): def assign_job(self, job_id: int, printer_id: int):
fields = [
"id",
"user_id",
"file_id",
"priority",
"started_at",
"finished_at",
"state",
"printer_id",
]
assert len(job) == len(fields)
return self._conn.execute( return self._conn.execute(
f"INSERT OR REPLACE INTO jobs ({', '.join(fields)}) VALUES ({', '.join(['?'] * len(fields))})", "UPDATE jobs SET printer_id = ?2 WHERE id = ?1", [job_id, printer_id]
job, )
@requires_conn
def start_job(self, job_id: int):
return self._conn.execute(
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
)
@requires_conn
def finish_job(self, job_id: int, state: str):
return self._conn.execute(
"UPDATE jobs SET finished_at = datetime('now'), state = ?2 WHERE id = ?1",
[job_id, state],
) )
@requires_conn @requires_conn

View file

@ -46,7 +46,7 @@
<div class="alert {{ category }}"> <div class="alert {{ category }}">
<input type="checkbox" id="alert{{ loop.index }}" /> <input type="checkbox" id="alert{{ loop.index }}" />
<label class="close" title="close" for="alert{{loop.index}}"> <label class="close" title="close" for="alert{{loop.index}}">
<div class="paused-timer" style="--duration: 2; --size: 10;"> <div class="timer">
<div class="mask"> <div class="mask">
</div> </div>
</div> </div>
@ -68,15 +68,12 @@
</body> </body>
<footer> <footer>
<script type="text/javascript"> <script type="text/javascript">
document.querySelectorAll('.alert input[type=checkbox]').forEach(function($el) { document.querySelectorAll('.alert input[type=checkbox]').forEach(($el) => {
console.log("Setting timeout on", $el); document.querySelectorAll(`.alert label[for=${$el.id}] .timer`).forEach(($it) => {
document.querySelectorAll(`.alert label[for=${$el.id}] .paused-timer`).forEach(function($it) { $it.addEventListener("animationend", () => {
$it.setAttribute("class", "timer");
});
setTimeout(() => {
console.log("Timeout firing...")
$el.click(); $el.click();
}, 2000); }, {once: true});
});
}); });
</script> </script>
</footer> </footer>

View file

@ -6,7 +6,7 @@
{% if printers %} {% if printers %}
<ul> <ul>
{% for printer in printers %} {% for printer in printers %}
{% with id, name, url, last_poll, status = printer %} {% with id, name, url, _api_key, last_poll, status = printer %}
<li class="printer row"> <li class="printer row">
<span class="printer-name">{{name}}</span> <span class="printer-name">{{name}}</span>
<span class="printer-url"><code>{{url}}</code></span> <span class="printer-url"><code>{{url}}</code></span>

View file

@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""Corn jobs 🌽
Supporting the core app with asynchronous maintenance tasks.
Mostly related to monitoring and managing Printer state.
"""
from time import sleep
from threading import Thread, Event
from typing import Callable
from datetime import timedelta, datetime
import logging
from contextlib import closing
from urllib import parse as urlparse
from tentacles.store import Store
from octorest import OctoRest as _OR
from requests import Response
from requests.exceptions import HTTPError, ConnectionError, Timeout
class OctoRest(_OR):
def _get(self, path, params=None):
url = urlparse.urljoin(self.url, path)
response = self.session.get(url, params=params, timeout=(0.1, 1.0))
self._check_response(response)
return response.json()
def _check_response(self, response: Response):
response.raise_for_status()
return response
log = logging.getLogger(__name__)
SHUTDOWN = Event()
def corn_job(every: timedelta):
def _decorator(f):
def _helper(*args, **kwargs):
last = None
while not SHUTDOWN.is_set():
if not last or (datetime.now() - last) > every:
log.debug(f"Ticking job {f.__name__}")
try:
last = datetime.now()
f(*args, **kwargs)
except Exception:
log.exception(f"Error while procesing task {f.__name__}")
else:
sleep(1)
return _helper
return _decorator
@corn_job(timedelta(seconds=5))
def poll_printers(db_factory: Callable[[], Store]) -> None:
"""Poll printers for their status."""
with closing(db_factory()) as db:
for printer in db.list_printers():
id, name, url, api_key, last_poll, status = printer
try:
client = OctoRest(url=url, apikey=api_key)
job = client.job_info()
state = client.printer().get("state").get("flags", {})
if state.get("error"):
db.update_printer_status(id, "error")
elif state.get("ready"):
db.update_printer_status(id, "idle")
elif state.get("printing"):
db.update_printer_status(id, "running")
else:
raise Exception(f"Indeterminate state {state!r}")
except (ConnectionError, Timeout):
db.update_printer_status(id, "error")
except HTTPError as e:
assert isinstance(e.response, Response)
if e.response.status_code in [403, 401] or "error" in job:
db.update_printer_status(id, "error")
elif e.response.json().get("error") == "Printer is not operational":
db.update_printer_status(id, "disconnected")
else:
raise Exception(
f"Indeterminate state {e.response.status_code}, {e.response.json()!r}"
)
@corn_job(timedelta(seconds=5))
def assign_jobs(db_factory: Callable[[], Store]) -> None:
"""Assign jobs to printers. Uploading files and job state management is handled separately."""
with closing(db_factory()) as db:
for printer_id in db.list_idle_printers():
printer = db.fetch_printer(printer_id)
if printer.status != "idle":
continue
if next_job_id := db.poll_job_queue():
db.assign_job(next_job_id, printer_id)
@corn_job(timedelta(seconds=5))
def push_jobs(db_factory: Callable[[], Store]) -> None:
"""Ensure that job files are uploaded and started to the assigned printer."""
with closing(db_factory()) as db:
for job in db.list_mapped_jobs():
printer = db.fetch_printer(job.printer_id)
file = db.fetch_file(job.file_id)
try:
client = OctoRest(printer.url, printer.api_key)
client.upload(file.filename, select=True, print=True)
db.start_job(job.id)
except Exception:
log.exception("Oop")
@corn_job(timedelta(seconds=5))
def pull_jobs(db_factory: Callable[[], Store]) -> None:
"""Poll the state of mapped printers to control jobs."""
with closing(db_factory()) as db:
for job in db.list_running_jobs():
printer = db.fetch_printer(job.printer_id)
if printer.status != "running":
db.finish_job(job.id, printer.status)
def create_workers(db_factory: Callable[[], Store]) -> Event:
Thread(target=poll_printers, args=[db_factory]).start()
Thread(target=assign_jobs, args=[db_factory]).start()
Thread(target=push_jobs, args=[db_factory]).start()
Thread(target=pull_jobs, args=[db_factory]).start()
return SHUTDOWN