A whoooooole lot of refactoring

This commit is contained in:
Reid 'arrdem' McKenzie 2023-06-03 15:09:50 -06:00
parent 25f3c087c4
commit 1124f9a75b
14 changed files with 215 additions and 167 deletions

View file

@ -62,18 +62,18 @@ load("@arrdem_source_pypi//:requirements.bzl", "install_deps")
# Call it to define repos for your requirements. # Call it to define repos for your requirements.
install_deps() install_deps()
git_repository( # git_repository(
name = "rules_zapp",
remote = "https://git.arrdem.com/arrdem/rules_zapp.git",
commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b",
# tag = "0.1.2",
)
# local_repository(
# name = "rules_zapp", # name = "rules_zapp",
# path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp", # remote = "https://git.arrdem.com/arrdem/rules_zapp.git",
# commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b",
# # tag = "0.1.2",
# ) # )
local_repository(
name = "rules_zapp",
path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp",
)
#################################################################################################### ####################################################################################################
# Docker support # Docker support
#################################################################################################### ####################################################################################################

View file

@ -17,9 +17,10 @@ from tentacles.blueprints import (
printer_ui, printer_ui,
user_ui, user_ui,
) )
from tentacles.db import Db
from tentacles.globals import _ctx, Ctx, ctx from tentacles.globals import _ctx, Ctx, ctx
from tentacles.store import Store from tentacles.workers import *
from tentacles.workers import Worker from tentacles.workers import assign_jobs, Worker
@click.group() @click.group()
@ -28,7 +29,7 @@ def cli():
def db_factory(app): def db_factory(app):
store = Store(app.config.get("db", {}).get("uri")) store = Db(app.config.get("db", {}).get("uri"))
store.connect() store.connect()
return store return store
@ -38,10 +39,11 @@ def custom_ctx(app, wsgi_app):
store = db_factory(app) store = db_factory(app)
token = _ctx.set(Ctx(store)) token = _ctx.set(Ctx(store))
try: try:
with store.savepoint():
return wsgi_app(environ, start_response) return wsgi_app(environ, start_response)
finally: finally:
_ctx.reset(token)
store.close() store.close()
_ctx.reset(token)
return helper return helper
@ -56,21 +58,21 @@ def user_session():
if ( if (
( (
(session_id := request.cookies.get("sid", "")) (session_id := request.cookies.get("sid", ""))
and (uid := ctx.db.try_key(session_id)) and (row := ctx.db.try_key(kid=session_id))
) )
or ( or (
request.authorization request.authorization
and request.authorization.token and request.authorization.token
and (uid := ctx.db.try_key(request.authorization.token)) and (row := ctx.db.try_key(kid=request.authorization.token))
) )
or ( or (
(api_key := request.headers.get("x-api-key")) (api_key := request.headers.get("x-api-key"))
and (uid := ctx.db.try_key(api_key)) and (row := ctx.db.try_key(kid=api_key))
) )
): ):
ctx.sid = session_id ctx.sid = row.id
ctx.uid = uid ctx.uid = row.user_id
user = ctx.db.fetch_user(uid) user = ctx.db.fetch_user(row.user_id)
ctx.gid = user.group_id ctx.gid = user.group_id
ctx.username = user.name ctx.username = user.name
ctx.is_admin = user.group_id == 0 ctx.is_admin = user.group_id == 0
@ -104,11 +106,12 @@ def make_app():
@click.option("--config", type=Path) @click.option("--config", type=Path)
def serve(hostname: str, port: int, config: Path): def serve(hostname: str, port: int, config: Path):
logging.basicConfig( logging.basicConfig(
format="%(asctime)s %(relativeCreated)6d %(threadName)s - %(name)s - %(levelname)s - %(message)s", format="%(asctime)s %(threadName)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO, level=logging.INFO,
) )
logging.getLogger("tentacles").setLevel(logging.DEBUG) logging.getLogger("tentacles").setLevel(logging.DEBUG)
logging.getLogger("tentacles.db").setLevel(logging.DEBUG - 1)
app = make_app() app = make_app()
@ -133,8 +136,14 @@ def serve(hostname: str, port: int, config: Path):
server.shutdown_timeout = 1 server.shutdown_timeout = 1
server.subscribe() server.subscribe()
# Spawn the worker thread # Spawn the worker thread(s)
Worker(cherrypy.engine, app, db_factory, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, poll_printers, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, assign_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, push_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, revoke_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, pull_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, send_emails, frequency=5).start()
# Run the server # Run the server
cherrypy.engine.start() cherrypy.engine.start()

View file

@ -98,10 +98,12 @@ def create_file(location: Optional[str] = None):
return {"error": "file exists already"}, 409 return {"error": "file exists already"}, 409
file.save(sanitized_path) file.save(sanitized_path)
fid = ctx.db.create_file(ctx.uid, file.filename, sanitized_path) fid = ctx.db.create_file(
uid=ctx.uid, filename=file.filename, path=sanitized_path
)
if request.form.get("print", "").lower() == "true": if request.form.get("print", "").lower() == "true":
ctx.db.create_job(ctx.uid, fid) ctx.db.create_job(uid=ctx.uid, fid=fid)
return {"status": "ok"}, 202 return {"status": "ok"}, 202
@ -119,7 +121,7 @@ def get_files():
"owner": ctx.uid, "owner": ctx.uid,
"upload_date": f.upload_date, "upload_date": f.upload_date,
} }
for f in ctx.db.list_files(ctx.uid) for f in ctx.db.list_files(uid=ctx.uid)
] ]
}, 200 }, 200
@ -150,7 +152,7 @@ def get_jobs():
"finished_at": j.finished_at, "finished_at": j.finished_at,
"printer_id": j.printer_id, "printer_id": j.printer_id,
} }
for j in ctx.db.list_jobs() for j in ctx.db.list_jobs(uid=ctx.uid)
] ]
}, 200 }, 200

View file

@ -42,9 +42,10 @@ def manipulate_files():
return render_template("files.html.j2"), code return render_template("files.html.j2"), code
case "delete": case "delete":
file = ctx.db.fetch_file(ctx.uid, int(request.form.get("file_id"))) file = ctx.db.fetch_file(uid=ctx.uid, fid=int(request.form.get("file_id")))
if any( if any(
job.finished_at is None for job in ctx.db.list_jobs_by_file(file.id) job.finished_at is None
for job in ctx.db.list_jobs_by_file(uid=ctx.uid, fid=file.id)
): ):
flash("File is in use", category="error") flash("File is in use", category="error")
return render_template("files.html.j2"), 400 return render_template("files.html.j2"), 400
@ -52,7 +53,7 @@ def manipulate_files():
if os.path.exists(file.path): if os.path.exists(file.path):
os.unlink(file.path) os.unlink(file.path)
ctx.db.delete_file(ctx.uid, file.id) ctx.db.delete_file(uid=ctx.uid, fid=file.id)
flash("File deleted", category="info") flash("File deleted", category="info")
case _: case _:

View file

@ -29,22 +29,24 @@ def list_jobs():
def manipulate_jobs(): def manipulate_jobs():
match request.form.get("action"): match request.form.get("action"):
case "enqueue": case "enqueue":
ctx.db.create_job(ctx.uid, int(request.form.get("file_id"))) ctx.db.create_job(uid=ctx.uid, fid=int(request.form.get("file_id")))
flash("Job created!", category="info") flash("Job created!", category="info")
case "duplicate": case "duplicate":
if job := ctx.db.fetch_job(ctx.uid, int(request.form.get("job_id"))): if job := ctx.db.fetch_job(
ctx.db.create_job(ctx.uid, job.file_id) uid=ctx.uid, jid=int(request.form.get("job_id"))
):
ctx.db.create_job(uid=ctx.uid, fid=job.file_id)
flash("Job created!", category="info") flash("Job created!", category="info")
else: else:
flash("Could not duplicate", category="error") flash("Could not duplicate", category="error")
case "cancel": case "cancel":
ctx.db.cancel_job(ctx.uid, int(request.form.get("job_id"))) ctx.db.cancel_job(uid=ctx.uid, jid=int(request.form.get("job_id")))
flash("Cancellation reqested", category="info") flash("Cancellation reqested", category="info")
case "delete": case "delete":
ctx.db.delete_job(ctx.uid, int(request.form.get("job_id"))) ctx.db.delete_job(uid=ctx.uid, jid=int(request.form.get("job_id")))
flash("Job deleted", category="info") flash("Job deleted", category="info")
case _: case _:

View file

@ -36,9 +36,10 @@ def add_printer():
assert request.form["url"] assert request.form["url"]
assert request.form["api_key"] assert request.form["api_key"]
ctx.db.try_create_printer( ctx.db.try_create_printer(
request.form["name"], name=request.form["name"],
request.form["url"], url=request.form["url"],
request.form["api_key"], api_key=request.form["api_key"],
sid=0, # Disconnected
) )
flash("Printer created") flash("Printer created")
return redirect("/printers") return redirect("/printers")

View file

@ -42,13 +42,13 @@ def get_login():
@BLUEPRINT.route("/user/login", methods=["POST"]) @BLUEPRINT.route("/user/login", methods=["POST"])
def post_login(): def post_login():
if sid := ctx.db.try_login( if row := ctx.db.try_login(
username := request.form["username"], username=(username := request.form["username"]),
salt(request.form["password"]), password=salt(request.form["password"]),
timedelta(days=1), ttl=timedelta(days=1),
): ):
resp = redirect("/") resp = redirect("/")
resp.set_cookie("sid", sid) resp.set_cookie("sid", row.id)
flash(f"Welcome, {username}", category="success") flash(f"Welcome, {username}", category="success")
return resp return resp
@ -72,7 +72,7 @@ def post_register():
username = request.form["username"] username = request.form["username"]
email = request.form["email"] email = request.form["email"]
group_id = 1 # Normal users group_id = 1 # Normal users
status_id = -2 # Unverified status_id = -3 # Unverified
for user_config in current_app.config.get("users", []): for user_config in current_app.config.get("users", []):
if user_config["email"] == email: if user_config["email"] == email:
@ -85,13 +85,17 @@ def post_register():
break break
if user := ctx.db.try_create_user( if user := ctx.db.try_create_user(
username, email, salt(request.form["password"]), group_id, status_id username=username,
email=email,
password=salt(request.form["password"]),
gid=group_id,
sid=status_id,
): ):
if user.status_id == -2: if user.status_id == -3:
ctx.db.create_email( ctx.db.create_email(
user.id, uid=user.id,
"Tentacles email confirmation", subject="Tentacles email confirmation",
render_template( body=render_template(
"verification_email.html.j2", "verification_email.html.j2",
username=user.name, username=user.name,
token_id=user.verification_token, token_id=user.verification_token,
@ -103,6 +107,10 @@ def post_register():
"Please check your email for a verification request", "Please check your email for a verification request",
category="success", category="success",
) )
elif user.status_id == 1:
flash("Welcome, please log in", category="success")
return render_template("register.html.j2") return render_template("register.html.j2")
except Exception as e: except Exception as e:
@ -115,7 +123,7 @@ def post_register():
@BLUEPRINT.route("/user/logout") @BLUEPRINT.route("/user/logout")
def logout(): def logout():
# Invalidate the user's authorization # Invalidate the user's authorization
ctx.db.delete_key(ctx.uid, ctx.sid) ctx.db.delete_key(uid=ctx.uid, kid=ctx.sid)
resp = redirect("/") resp = redirect("/")
resp.set_cookie("sid", "") resp.set_cookie("sid", "")
return resp return resp
@ -132,7 +140,7 @@ def get_settings():
@BLUEPRINT.route("/user", methods=["POST"]) @BLUEPRINT.route("/user", methods=["POST"])
def post_settings(): def post_settings():
if request.form["action"] == "add": if request.form["action"] == "add":
ttl_spec = request.form.get("ttl") ttl_spec = request.form.get("ttl", "")
if ttl_spec == "forever": if ttl_spec == "forever":
ttl = None ttl = None
elif m := re.fullmatch(r"(\d+)d", ttl_spec): elif m := re.fullmatch(r"(\d+)d", ttl_spec):
@ -141,7 +149,9 @@ def post_settings():
flash("Bad request", category="error") flash("Bad request", category="error")
return render_template("user.html.j2"), 400 return render_template("user.html.j2"), 400
ctx.db.create_key(ctx.sid, ttl, request.form.get("name")) ctx.db.create_key(
uid=ctx.uid, ttl=ttl, name=request.form.get("name", "anonymous")
)
flash("Key created", category="success") flash("Key created", category="success")
elif request.form["action"] == "revoke": elif request.form["action"] == "revoke":

View file

@ -8,6 +8,7 @@ from importlib.resources import files
from inspect import signature from inspect import signature
import logging import logging
import sqlite3 import sqlite3
from time import sleep
from types import GeneratorType, new_class from types import GeneratorType, new_class
from typing import Optional from typing import Optional
@ -27,7 +28,7 @@ def qfn(name, f):
# Force lazy values for convenience # Force lazy values for convenience
if isinstance(res, GeneratorType): if isinstance(res, GeneratorType):
res = list(res) res = list(res)
print("%s -> %r" % (name, res)) log.log(logging.DEBUG - 1, "%s (%r) -> %r", name, kwargs, res)
return res return res
_helper.__name__ = f.__name__ _helper.__name__ = f.__name__
@ -64,7 +65,7 @@ class LoginError(StoreError):
pass pass
class Store(Queries): class Db(Queries):
def __init__(self, path): def __init__(self, path):
self._path = path self._path = path
self._conn: sqlite3.Connection = None self._conn: sqlite3.Connection = None
@ -106,9 +107,24 @@ class Store(Queries):
try: try:
self.begin() self.begin()
yield self yield self
exc = None
for attempt in range(5):
try:
self.commit() self.commit()
break
except sqlite3.OperationalError as e:
exc = e
if e.sqlite_errorcode == 6:
sleep(0.1 * attempt)
continue
else:
raise e
else:
raise exc
except sqlite3.Error: except sqlite3.Error:
self.rollback() self.rollback()
log.exception("Forced to roll back!")
return _helper() return _helper()
@ -122,9 +138,11 @@ class Store(Queries):
################################################################################ ################################################################################
# Wrappers for doing Python type mapping # Wrappers for doing Python type mapping
def create_key(self, *, uid: int, name: str, ttl: timedelta): def create_key(self, *, uid: int, name: str, ttl: Optional[timedelta]):
return super().create_key( return super().create_key(
uid=uid, name=name, expiration=(datetime.now() + ttl).isoformat() uid=uid,
name=name,
expiration=((datetime.now() + ttl).isoformat() if ttl else None),
) )
def try_login( def try_login(
@ -171,3 +189,6 @@ class Store(Queries):
""" """
super().refresh_key(kid=kid, expiration=(datetime.now() + ttl).isoformat()) super().refresh_key(kid=kid, expiration=(datetime.now() + ttl).isoformat())
def finish_job(self, *, jid: int, state: str, message: Optional[str] = None):
super().finish_job(jid=jid, state=state, message=message)

View file

@ -3,13 +3,13 @@
from contextvars import ContextVar from contextvars import ContextVar
from attrs import define from attrs import define
from tentacles.store import Store from tentacles.db import Db
from werkzeug.local import LocalProxy from werkzeug.local import LocalProxy
@define @define
class Ctx: class Ctx:
db: Store db: Db
uid: int = None uid: int = None
gid: int = None gid: int = None
sid: str = None sid: str = None

View file

@ -93,7 +93,6 @@ CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT id INTEGER PRIMARY KEY AUTOINCREMENT
, user_id INTEGER NOT NULL , user_id INTEGER NOT NULL
, file_id INTEGER NOT NULL , file_id INTEGER NOT NULL
, priority INTEGER CHECK(priority IS NOT NULL AND 0 <= priority)
, started_at TEXT , started_at TEXT
, cancelled_at TEXT , cancelled_at TEXT
, finished_at TEXT , finished_at TEXT
@ -199,6 +198,7 @@ INSERT INTO user_keys (
VALUES (:uid, :name, :expiration) VALUES (:uid, :name, :expiration)
RETURNING RETURNING
id id
, user_id
; ;
-- name: try-login^ -- name: try-login^
@ -232,7 +232,8 @@ WHERE
-- name: try-key^ -- name: try-key^
SELECT SELECT
user_id id
, user_id
FROM user_keys FROM user_keys
WHERE WHERE
(expiration IS NULL OR unixepoch(expiration) > unixepoch('now')) (expiration IS NULL OR unixepoch(expiration) > unixepoch('now'))
@ -265,7 +266,7 @@ INSERT INTO printers (
, api_key , api_key
, status_id , status_id
) )
VALUES (:name, :url, :api_key, :status_id) VALUES (:name, :url, :api_key, :sid)
RETURNING RETURNING
id id
; ;
@ -310,10 +311,10 @@ WHERE
-- name: update-printer-status! -- name: update-printer-status!
UPDATE printers UPDATE printers
SET SET
status_id = (SELECT id FROM printer_statuses WHERE name = :status) status_id = (SELECT id FROM printer_statuses WHERE name = :status or id = :status)
, last_poll_date = datetime('now') , last_poll_date = datetime('now')
WHERE WHERE
id = :uid id = :pid
; ;
---------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------
@ -364,16 +365,10 @@ WHERE
INSERT INTO jobs ( INSERT INTO jobs (
user_id user_id
, file_id , file_id
, priority
) )
VALUES ( VALUES (
:uid :uid
, :fid, , :fid
, (
SELECT priority + :priority
FROM users
WHERE uid = :uid
)
) )
RETURNING RETURNING
id id
@ -384,7 +379,7 @@ SELECT
FROM jobs FROM jobs
WHERE WHERE
user_id = :uid user_id = :uid
AND id = :fid AND id = :jid
; ;
-- name: list-jobs -- name: list-jobs
@ -401,6 +396,7 @@ SELECT
FROM jobs FROM jobs
WHERE WHERE
file_id = :fid file_id = :fid
, uid = :uid
; ;
-- name: list-job-queue -- name: list-job-queue
@ -410,11 +406,9 @@ FROM jobs
WHERE WHERE
finished_at IS NULL finished_at IS NULL
AND (:uid IS NULL OR user_id = :uid) AND (:uid IS NULL OR user_id = :uid)
ORDER BY
priority DESC
; ;
-- name: poll-job-queue -- name: poll-job-queue^
SELECT SELECT
* *
FROM jobs FROM jobs
@ -422,8 +416,6 @@ WHERE
started_at IS NULL started_at IS NULL
AND finished_at IS NULL AND finished_at IS NULL
AND printer_id IS NULL AND printer_id IS NULL
ORDER BY
priority DESC
LIMIT 1 LIMIT 1
; ;

View file

@ -35,7 +35,7 @@
{% macro job_state(job) %} {% macro job_state(job) %}
{{ 'queued' if (not job.finished_at and not job.printer_id and not job.cancelled_at) else {{ 'queued' if (not job.finished_at and not job.printer_id and not job.cancelled_at) else
'running' if (not job.finished_at and job.printer_id and not job.cancelled_at) else 'running' if (not job.finished_at and job.printer_id and not job.cancelled_at) else
'cancelling' if (not job.finished_at and job.printer_id and job.cancelled_at) else 'cancelling' if (not job.finished_at and job.cancelled_at) else
job.state }} job.state }}
{% endmacro %} {% endmacro %}

View file

@ -8,11 +8,10 @@ Mostly related to monitoring and managing Printer state.
""" """
from contextlib import closing from contextlib import closing
from datetime import datetime, timedelta
import logging import logging
from pathlib import Path from pathlib import Path
from threading import Event from threading import Event
from time import sleep from typing import Callable
from urllib import parse as urlparse from urllib import parse as urlparse
from cherrypy.process.plugins import Monitor from cherrypy.process.plugins import Monitor
@ -25,7 +24,7 @@ from requests.exceptions import (
HTTPError, HTTPError,
Timeout, Timeout,
) )
from tentacles.store import Store from tentacles.db import Db
class OctoRest(_OR): class OctoRest(_OR):
@ -46,36 +45,16 @@ log = logging.getLogger(__name__)
SHUTDOWN = Event() SHUTDOWN = Event()
def corn_job(every: timedelta): def poll_printers(app: App, db: Db) -> None:
def _decorator(f):
def _helper(*args, **kwargs):
last = None
while not SHUTDOWN.is_set():
if not last or (datetime.now() - last) > every:
log.debug(f"Ticking job {f.__name__}")
try:
last = datetime.now()
f(*args, **kwargs)
except Exception:
log.exception(f"Error while procesing task {f.__name__}")
else:
sleep(1)
return _helper
return _decorator
def poll_printers(app: App, store: Store) -> None:
"""Poll printers for their status.""" """Poll printers for their status."""
for printer in store.list_printers(): for printer in db.list_printers():
mapped_job = store.fetch_job_by_printer(printer.id) mapped_job = db.fetch_job_by_printer(pid=printer.id)
def _set_status(status: str): def _set_status(status: str):
if printer.status != status: if printer.status != status:
print(f"Printer {printer.id} {printer.status} -> {status}") log.info(f"Printer {printer.id} {printer.status} -> {status}")
store.update_printer_status(printer.id, status) db.update_printer_status(pid=printer.id, status=status)
try: try:
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
@ -91,7 +70,7 @@ def poll_printers(app: App, store: Store) -> None:
# polling tasks. This violates separation of concerns a bit, # polling tasks. This violates separation of concerns a bit,
# but appears required for correctness. # but appears required for correctness.
if mapped_job: if mapped_job:
store.finish_job(mapped_job.id, "error") db.finish_job(jid=mapped_job.id, state="error")
_set_status("error") _set_status("error")
@ -129,24 +108,24 @@ def poll_printers(app: App, store: Store) -> None:
) )
def assign_jobs(app: App, store: Store) -> None: def assign_jobs(app: App, db: Db) -> None:
"""Assign jobs to printers. Uploading files and job state management is handled separately.""" """Assign jobs to printers. Uploading files and job state management is handled separately."""
for printer_id in store.list_idle_printers(): for printer in db.list_idle_printers():
if job_id := store.poll_job_queue(): if job := db.poll_job_queue():
store.assign_job(job_id, printer_id) db.assign_job(jid=job.id, pid=printer.id)
print(f"Mapped job {job_id} to printer {printer_id}") log.info(f"Mapped job {job.id} to printer {printer.id}")
def push_jobs(app: App, store: Store) -> None: def push_jobs(app: App, db: Db) -> None:
"""Ensure that job files are uploaded and started to the assigned printer.""" """Ensure that job files are uploaded and started to the assigned printer."""
for job in store.list_mapped_jobs(): for job in db.list_mapped_jobs():
printer = store.fetch_printer(job.printer_id) printer = db.fetch_printer(pid=job.printer_id)
file = store.fetch_file(job.user_id, job.file_id) file = db.fetch_file(uid=job.user_id, fid=job.file_id)
if not file: if not file:
log.error(f"Job {job.id} no longer maps to a file") log.error(f"Job {job.id} no longer maps to a file")
store.delete_job(job.user_id, job.id) db.delete_job(job.user_id, job.id)
try: try:
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
@ -157,11 +136,15 @@ def push_jobs(app: App, store: Store) -> None:
printer_state = {"error": printer_job.get("error")} printer_state = {"error": printer_job.get("error")}
if printer_state.get("error"): if printer_state.get("error"):
print(f"Printer {printer.id} is in error, can't push") log.warn(f"Printer {printer.id} is in error, can't push")
continue continue
try: try:
if not client.files_info("local", Path(file.path).name):
client.upload(file.path) client.upload(file.path)
else:
log.info("Don't need to upload the job!")
except HTTPError as e: except HTTPError as e:
if e.response.status_code == 409: if e.response.status_code == 409:
pass pass
@ -170,7 +153,7 @@ def push_jobs(app: App, store: Store) -> None:
client.select(Path(file.path).name) client.select(Path(file.path).name)
client.start() client.start()
store.start_job(job.id) db.start_job(job.id)
except TimeoutError: except TimeoutError:
pass pass
@ -178,19 +161,19 @@ def push_jobs(app: App, store: Store) -> None:
log.exception("Oop") log.exception("Oop")
def revoke_jobs(app: App, store: Store) -> None: def revoke_jobs(app: App, db: Db) -> None:
"""Ensure that job files are uploaded and started to the assigned printer. """Ensure that job files are uploaded and started to the assigned printer.
Note that this will ALSO cancel jobs out of the print queue. Note that this will ALSO cancel jobs out of the print queue.
""" """
for job in store.list_cancelled_jobs(): for job in db.list_canceling_jobs():
if job.printer_id: if job.printer_id:
printer = store.fetch_printer(job.printer_id) printer = db.fetch_printer(pid=job.printer_id)
try: try:
print(f"Cancelling running job {job.id}") log.info(f"Cancelling running job {job.id}")
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
try: try:
client.cancel() client.cancel()
@ -200,8 +183,8 @@ def revoke_jobs(app: App, store: Store) -> None:
else: else:
raise raise
print(f"Job {job.id} -> cancelled") log.info(f"Job {job.id} -> cancelled")
store.finish_job(job.id, "cancelled") db.finish_job(jid=job.id, state="cancelled")
except TimeoutError: except TimeoutError:
pass pass
@ -210,15 +193,15 @@ def revoke_jobs(app: App, store: Store) -> None:
log.exception("Oop") log.exception("Oop")
else: else:
print(f"Unmapped job {job.id} became cancelled") log.info(f"Unmapped job {job.id} became cancelled")
store.finish_job(job.id, "cancelled") db.finish_job(jid=job.id, state="cancelled")
def pull_jobs(app: App, store: Store) -> None: def pull_jobs(app: App, db: Db) -> None:
"""Poll the state of mapped printers to control jobs.""" """Poll the state of mapped printers to control jobs."""
for job in store.list_running_jobs(): for job in db.list_running_jobs():
printer = store.fetch_printer(job.printer_id) printer = db.fetch_printer(pid=job.printer_id)
try: try:
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
job_state = client.job_info() job_state = client.job_info()
@ -231,19 +214,19 @@ def pull_jobs(app: App, store: Store) -> None:
pass pass
elif job_state.get("progress", {}).get("completion", 0.0) == 100.0: elif job_state.get("progress", {}).get("completion", 0.0) == 100.0:
print(f"Job {job.id} has succeeded") log.info(f"Job {job.id} has succeeded")
store.finish_job(job.id, "success") db.finish_job(jid=job.id, state="success")
elif printer_state.get("error"): elif printer_state.get("error"):
print(f"Job {job.id} has failed") log.warn(f"Job {job.id} has failed")
store.finish_job(job.id, "failed") db.finish_job(jid=job.id, state="failed")
elif printer_state.get("cancelling"): elif printer_state.get("cancelling"):
print(f"Job {job.id} has been acknowledged as cancelled") log.info(f"Job {job.id} has been acknowledged as cancelled")
store.finish_job(job.id, "cancelled") db.finish_job(jid=job.id, state="cancelled")
else: else:
print( log.warn(
f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}" f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}"
) )
@ -254,35 +237,60 @@ def pull_jobs(app: App, store: Store) -> None:
log.exception("Oop") log.exception("Oop")
def send_emails(app, store: Store): def send_emails(app, db: Db):
with closing( with closing(
FastMailSMTP( FastMailSMTP(
app.config.get("fastmail", {}).get("username"), app.config.get("fastmail", {}).get("username"),
app.config.get("fastmail", {}).get("key"), app.config.get("fastmail", {}).get("key"),
) )
) as fm: ) as fm:
for message in store.poll_spool(): for message in db.poll_email_queue():
fm.send_message( fm.send_message(
from_addr="root@tirefireind.us", from_addr="root@tirefireind.us",
to_addrs=[message.to], to_addrs=[message.to],
subject=message.subject, subject=message.subject,
msg=message.body, msg=message.body,
) )
store.send_email(message.id) db.send_email(message.id)
def once(f):
val = uninitialized = object()
def _helper(*args, **kwargs):
nonlocal val
if val is uninitialized:
val = f(*args, **kwargs)
return val
return _helper
def toil(*fs):
def _helper(*args, **kwargs):
for f in fs:
f(*args, **kwargs)
_helper.__name__ = "toil"
return _helper
class Worker(Monitor): class Worker(Monitor):
def __init__(self, bus, app, db_factory, **kwargs): def __init__(
self,
bus,
app: App,
db_factory: Callable[[App], Db],
callback: Callable[[App, Db], None],
**kwargs,
):
self._app = app self._app = app
self._db_factory = db_factory self._db_factory = db_factory
super().__init__(bus, self.callback, **kwargs) self._callback = callback
super().__init__(
bus, self.callback, **kwargs, name=f"Async {callback.__name__}"
)
def callback(self): def callback(self):
log.debug("Tick") with closing(self._db_factory(self._app)) as db:
with self._app.app_context(), closing(self._db_factory(self._app)) as store: self._callback(self._app, db)
poll_printers(self._app, store)
assign_jobs(self._app, store)
push_jobs(self._app, store)
revoke_jobs(self._app, store)
pull_jobs(self._app, store)
send_emails(self._app, store)

View file

@ -3,12 +3,12 @@
from datetime import timedelta from datetime import timedelta
import pytest import pytest
import tentacles.store as s from tentacles.db import Db
@pytest.yield_fixture @pytest.yield_fixture
def store(): def db():
conn = s.Store(":memory:") conn = Db(":memory:")
conn.connect() conn.connect()
yield conn yield conn
conn.close() conn.close()
@ -25,9 +25,9 @@ def password_testy():
@pytest.fixture @pytest.fixture
def uid_testy(store: s.Store, username_testy, password_testy): def uid_testy(db: Db, username_testy, password_testy):
with store.savepoint(): with db.savepoint():
return store.try_create_user( return db.try_create_user(
username=username_testy, username=username_testy,
email=username_testy, email=username_testy,
password=password_testy, password=password_testy,
@ -41,8 +41,10 @@ def login_ttl():
@pytest.fixture @pytest.fixture
def sid_testy(store, uid_testy, username_testy, password_testy, login_ttl): def sid_testy(db: Db, uid_testy, username_testy, password_testy, login_ttl):
with store.savepoint(): with db.savepoint():
return store.try_login( res = db.try_login(
username=username_testy, password=password_testy, ttl=login_ttl username=username_testy, password=password_testy, ttl=login_ttl
).id )
assert res.user_id == uid_testy
return res.id

View file

@ -1,13 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from tentacles.store import Store from tentacles.db import Db
def test_store_initializes(store: Store): def test_db_initializes(store: Db):
assert isinstance(store, Store) assert isinstance(store, Db)
def test_store_savepoint(store: Store): def test_db_savepoint(store: Db):
obj = store.savepoint() obj = store.savepoint()
assert hasattr(obj, "__enter__") assert hasattr(obj, "__enter__")