diff --git a/WORKSPACE b/WORKSPACE index 94470f9..6731735 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -62,18 +62,18 @@ load("@arrdem_source_pypi//:requirements.bzl", "install_deps") # Call it to define repos for your requirements. install_deps() -git_repository( - name = "rules_zapp", - remote = "https://git.arrdem.com/arrdem/rules_zapp.git", - commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b", - # tag = "0.1.2", -) - -# local_repository( -# name = "rules_zapp", -# path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp", +# git_repository( +# name = "rules_zapp", +# remote = "https://git.arrdem.com/arrdem/rules_zapp.git", +# commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b", +# # tag = "0.1.2", # ) +local_repository( + name = "rules_zapp", + path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp", +) + #################################################################################################### # Docker support #################################################################################################### diff --git a/projects/tentacles/src/python/tentacles/__main__.py b/projects/tentacles/src/python/tentacles/__main__.py index 370915a..0e95cea 100644 --- a/projects/tentacles/src/python/tentacles/__main__.py +++ b/projects/tentacles/src/python/tentacles/__main__.py @@ -17,9 +17,10 @@ from tentacles.blueprints import ( printer_ui, user_ui, ) +from tentacles.db import Db from tentacles.globals import _ctx, Ctx, ctx -from tentacles.store import Store -from tentacles.workers import Worker +from tentacles.workers import * +from tentacles.workers import assign_jobs, Worker @click.group() @@ -28,7 +29,7 @@ def cli(): def db_factory(app): - store = Store(app.config.get("db", {}).get("uri")) + store = Db(app.config.get("db", {}).get("uri")) store.connect() return store @@ -38,10 +39,11 @@ def custom_ctx(app, wsgi_app): store = db_factory(app) token = _ctx.set(Ctx(store)) try: - return wsgi_app(environ, start_response) + with store.savepoint(): + return wsgi_app(environ, start_response) finally: - _ctx.reset(token) store.close() + _ctx.reset(token) return helper @@ -56,21 +58,21 @@ def user_session(): if ( ( (session_id := request.cookies.get("sid", "")) - and (uid := ctx.db.try_key(session_id)) + and (row := ctx.db.try_key(kid=session_id)) ) or ( request.authorization and request.authorization.token - and (uid := ctx.db.try_key(request.authorization.token)) + and (row := ctx.db.try_key(kid=request.authorization.token)) ) or ( (api_key := request.headers.get("x-api-key")) - and (uid := ctx.db.try_key(api_key)) + and (row := ctx.db.try_key(kid=api_key)) ) ): - ctx.sid = session_id - ctx.uid = uid - user = ctx.db.fetch_user(uid) + ctx.sid = row.id + ctx.uid = row.user_id + user = ctx.db.fetch_user(row.user_id) ctx.gid = user.group_id ctx.username = user.name ctx.is_admin = user.group_id == 0 @@ -104,11 +106,12 @@ def make_app(): @click.option("--config", type=Path) def serve(hostname: str, port: int, config: Path): logging.basicConfig( - format="%(asctime)s %(relativeCreated)6d %(threadName)s - %(name)s - %(levelname)s - %(message)s", + format="%(asctime)s %(threadName)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logging.getLogger("tentacles").setLevel(logging.DEBUG) + logging.getLogger("tentacles.db").setLevel(logging.DEBUG - 1) app = make_app() @@ -133,8 +136,14 @@ def serve(hostname: str, port: int, config: Path): server.shutdown_timeout = 1 server.subscribe() - # Spawn the worker thread - Worker(cherrypy.engine, app, db_factory, frequency=5).start() + # Spawn the worker thread(s) + + Worker(cherrypy.engine, app, db_factory, poll_printers, frequency=5).start() + Worker(cherrypy.engine, app, db_factory, assign_jobs, frequency=5).start() + Worker(cherrypy.engine, app, db_factory, push_jobs, frequency=5).start() + Worker(cherrypy.engine, app, db_factory, revoke_jobs, frequency=5).start() + Worker(cherrypy.engine, app, db_factory, pull_jobs, frequency=5).start() + Worker(cherrypy.engine, app, db_factory, send_emails, frequency=5).start() # Run the server cherrypy.engine.start() diff --git a/projects/tentacles/src/python/tentacles/blueprints/api.py b/projects/tentacles/src/python/tentacles/blueprints/api.py index 5d312db..8a89935 100644 --- a/projects/tentacles/src/python/tentacles/blueprints/api.py +++ b/projects/tentacles/src/python/tentacles/blueprints/api.py @@ -98,10 +98,12 @@ def create_file(location: Optional[str] = None): return {"error": "file exists already"}, 409 file.save(sanitized_path) - fid = ctx.db.create_file(ctx.uid, file.filename, sanitized_path) + fid = ctx.db.create_file( + uid=ctx.uid, filename=file.filename, path=sanitized_path + ) if request.form.get("print", "").lower() == "true": - ctx.db.create_job(ctx.uid, fid) + ctx.db.create_job(uid=ctx.uid, fid=fid) return {"status": "ok"}, 202 @@ -119,7 +121,7 @@ def get_files(): "owner": ctx.uid, "upload_date": f.upload_date, } - for f in ctx.db.list_files(ctx.uid) + for f in ctx.db.list_files(uid=ctx.uid) ] }, 200 @@ -150,7 +152,7 @@ def get_jobs(): "finished_at": j.finished_at, "printer_id": j.printer_id, } - for j in ctx.db.list_jobs() + for j in ctx.db.list_jobs(uid=ctx.uid) ] }, 200 diff --git a/projects/tentacles/src/python/tentacles/blueprints/file_ui.py b/projects/tentacles/src/python/tentacles/blueprints/file_ui.py index 4555b09..3fa087a 100644 --- a/projects/tentacles/src/python/tentacles/blueprints/file_ui.py +++ b/projects/tentacles/src/python/tentacles/blueprints/file_ui.py @@ -42,9 +42,10 @@ def manipulate_files(): return render_template("files.html.j2"), code case "delete": - file = ctx.db.fetch_file(ctx.uid, int(request.form.get("file_id"))) + file = ctx.db.fetch_file(uid=ctx.uid, fid=int(request.form.get("file_id"))) if any( - job.finished_at is None for job in ctx.db.list_jobs_by_file(file.id) + job.finished_at is None + for job in ctx.db.list_jobs_by_file(uid=ctx.uid, fid=file.id) ): flash("File is in use", category="error") return render_template("files.html.j2"), 400 @@ -52,7 +53,7 @@ def manipulate_files(): if os.path.exists(file.path): os.unlink(file.path) - ctx.db.delete_file(ctx.uid, file.id) + ctx.db.delete_file(uid=ctx.uid, fid=file.id) flash("File deleted", category="info") case _: diff --git a/projects/tentacles/src/python/tentacles/blueprints/job_ui.py b/projects/tentacles/src/python/tentacles/blueprints/job_ui.py index ead52da..7e9bc4a 100644 --- a/projects/tentacles/src/python/tentacles/blueprints/job_ui.py +++ b/projects/tentacles/src/python/tentacles/blueprints/job_ui.py @@ -29,22 +29,24 @@ def list_jobs(): def manipulate_jobs(): match request.form.get("action"): case "enqueue": - ctx.db.create_job(ctx.uid, int(request.form.get("file_id"))) + ctx.db.create_job(uid=ctx.uid, fid=int(request.form.get("file_id"))) flash("Job created!", category="info") case "duplicate": - if job := ctx.db.fetch_job(ctx.uid, int(request.form.get("job_id"))): - ctx.db.create_job(ctx.uid, job.file_id) + if job := ctx.db.fetch_job( + uid=ctx.uid, jid=int(request.form.get("job_id")) + ): + ctx.db.create_job(uid=ctx.uid, fid=job.file_id) flash("Job created!", category="info") else: flash("Could not duplicate", category="error") case "cancel": - ctx.db.cancel_job(ctx.uid, int(request.form.get("job_id"))) + ctx.db.cancel_job(uid=ctx.uid, jid=int(request.form.get("job_id"))) flash("Cancellation reqested", category="info") case "delete": - ctx.db.delete_job(ctx.uid, int(request.form.get("job_id"))) + ctx.db.delete_job(uid=ctx.uid, jid=int(request.form.get("job_id"))) flash("Job deleted", category="info") case _: diff --git a/projects/tentacles/src/python/tentacles/blueprints/printer_ui.py b/projects/tentacles/src/python/tentacles/blueprints/printer_ui.py index 3c71ebc..f005f67 100644 --- a/projects/tentacles/src/python/tentacles/blueprints/printer_ui.py +++ b/projects/tentacles/src/python/tentacles/blueprints/printer_ui.py @@ -36,9 +36,10 @@ def add_printer(): assert request.form["url"] assert request.form["api_key"] ctx.db.try_create_printer( - request.form["name"], - request.form["url"], - request.form["api_key"], + name=request.form["name"], + url=request.form["url"], + api_key=request.form["api_key"], + sid=0, # Disconnected ) flash("Printer created") return redirect("/printers") diff --git a/projects/tentacles/src/python/tentacles/blueprints/user_ui.py b/projects/tentacles/src/python/tentacles/blueprints/user_ui.py index c9bfeae..2a7ffb3 100644 --- a/projects/tentacles/src/python/tentacles/blueprints/user_ui.py +++ b/projects/tentacles/src/python/tentacles/blueprints/user_ui.py @@ -42,13 +42,13 @@ def get_login(): @BLUEPRINT.route("/user/login", methods=["POST"]) def post_login(): - if sid := ctx.db.try_login( - username := request.form["username"], - salt(request.form["password"]), - timedelta(days=1), + if row := ctx.db.try_login( + username=(username := request.form["username"]), + password=salt(request.form["password"]), + ttl=timedelta(days=1), ): resp = redirect("/") - resp.set_cookie("sid", sid) + resp.set_cookie("sid", row.id) flash(f"Welcome, {username}", category="success") return resp @@ -72,7 +72,7 @@ def post_register(): username = request.form["username"] email = request.form["email"] group_id = 1 # Normal users - status_id = -2 # Unverified + status_id = -3 # Unverified for user_config in current_app.config.get("users", []): if user_config["email"] == email: @@ -85,13 +85,17 @@ def post_register(): break if user := ctx.db.try_create_user( - username, email, salt(request.form["password"]), group_id, status_id + username=username, + email=email, + password=salt(request.form["password"]), + gid=group_id, + sid=status_id, ): - if user.status_id == -2: + if user.status_id == -3: ctx.db.create_email( - user.id, - "Tentacles email confirmation", - render_template( + uid=user.id, + subject="Tentacles email confirmation", + body=render_template( "verification_email.html.j2", username=user.name, token_id=user.verification_token, @@ -103,6 +107,10 @@ def post_register(): "Please check your email for a verification request", category="success", ) + + elif user.status_id == 1: + flash("Welcome, please log in", category="success") + return render_template("register.html.j2") except Exception as e: @@ -115,7 +123,7 @@ def post_register(): @BLUEPRINT.route("/user/logout") def logout(): # Invalidate the user's authorization - ctx.db.delete_key(ctx.uid, ctx.sid) + ctx.db.delete_key(uid=ctx.uid, kid=ctx.sid) resp = redirect("/") resp.set_cookie("sid", "") return resp @@ -132,7 +140,7 @@ def get_settings(): @BLUEPRINT.route("/user", methods=["POST"]) def post_settings(): if request.form["action"] == "add": - ttl_spec = request.form.get("ttl") + ttl_spec = request.form.get("ttl", "") if ttl_spec == "forever": ttl = None elif m := re.fullmatch(r"(\d+)d", ttl_spec): @@ -141,7 +149,9 @@ def post_settings(): flash("Bad request", category="error") return render_template("user.html.j2"), 400 - ctx.db.create_key(ctx.sid, ttl, request.form.get("name")) + ctx.db.create_key( + uid=ctx.uid, ttl=ttl, name=request.form.get("name", "anonymous") + ) flash("Key created", category="success") elif request.form["action"] == "revoke": diff --git a/projects/tentacles/src/python/tentacles/store.py b/projects/tentacles/src/python/tentacles/db.py similarity index 81% rename from projects/tentacles/src/python/tentacles/store.py rename to projects/tentacles/src/python/tentacles/db.py index 4b92b11..dbbdbc7 100644 --- a/projects/tentacles/src/python/tentacles/store.py +++ b/projects/tentacles/src/python/tentacles/db.py @@ -8,6 +8,7 @@ from importlib.resources import files from inspect import signature import logging import sqlite3 +from time import sleep from types import GeneratorType, new_class from typing import Optional @@ -27,7 +28,7 @@ def qfn(name, f): # Force lazy values for convenience if isinstance(res, GeneratorType): res = list(res) - print("%s -> %r" % (name, res)) + log.log(logging.DEBUG - 1, "%s (%r) -> %r", name, kwargs, res) return res _helper.__name__ = f.__name__ @@ -64,7 +65,7 @@ class LoginError(StoreError): pass -class Store(Queries): +class Db(Queries): def __init__(self, path): self._path = path self._conn: sqlite3.Connection = None @@ -106,9 +107,24 @@ class Store(Queries): try: self.begin() yield self - self.commit() + exc = None + for attempt in range(5): + try: + self.commit() + break + except sqlite3.OperationalError as e: + exc = e + if e.sqlite_errorcode == 6: + sleep(0.1 * attempt) + continue + else: + raise e + else: + raise exc + except sqlite3.Error: self.rollback() + log.exception("Forced to roll back!") return _helper() @@ -122,9 +138,11 @@ class Store(Queries): ################################################################################ # Wrappers for doing Python type mapping - def create_key(self, *, uid: int, name: str, ttl: timedelta): + def create_key(self, *, uid: int, name: str, ttl: Optional[timedelta]): return super().create_key( - uid=uid, name=name, expiration=(datetime.now() + ttl).isoformat() + uid=uid, + name=name, + expiration=((datetime.now() + ttl).isoformat() if ttl else None), ) def try_login( @@ -171,3 +189,6 @@ class Store(Queries): """ super().refresh_key(kid=kid, expiration=(datetime.now() + ttl).isoformat()) + + def finish_job(self, *, jid: int, state: str, message: Optional[str] = None): + super().finish_job(jid=jid, state=state, message=message) diff --git a/projects/tentacles/src/python/tentacles/globals.py b/projects/tentacles/src/python/tentacles/globals.py index 5470d7c..80c2c3c 100644 --- a/projects/tentacles/src/python/tentacles/globals.py +++ b/projects/tentacles/src/python/tentacles/globals.py @@ -3,13 +3,13 @@ from contextvars import ContextVar from attrs import define -from tentacles.store import Store +from tentacles.db import Db from werkzeug.local import LocalProxy @define class Ctx: - db: Store + db: Db uid: int = None gid: int = None sid: str = None diff --git a/projects/tentacles/src/python/tentacles/schema.sql b/projects/tentacles/src/python/tentacles/schema.sql index 84cffef..373aaa3 100644 --- a/projects/tentacles/src/python/tentacles/schema.sql +++ b/projects/tentacles/src/python/tentacles/schema.sql @@ -93,7 +93,6 @@ CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT , user_id INTEGER NOT NULL , file_id INTEGER NOT NULL - , priority INTEGER CHECK(priority IS NOT NULL AND 0 <= priority) , started_at TEXT , cancelled_at TEXT , finished_at TEXT @@ -198,7 +197,8 @@ INSERT INTO user_keys ( ) VALUES (:uid, :name, :expiration) RETURNING - id + id + , user_id ; -- name: try-login^ @@ -232,7 +232,8 @@ WHERE -- name: try-key^ SELECT - user_id + id + , user_id FROM user_keys WHERE (expiration IS NULL OR unixepoch(expiration) > unixepoch('now')) @@ -265,7 +266,7 @@ INSERT INTO printers ( , api_key , status_id ) -VALUES (:name, :url, :api_key, :status_id) +VALUES (:name, :url, :api_key, :sid) RETURNING id ; @@ -310,10 +311,10 @@ WHERE -- name: update-printer-status! UPDATE printers SET - status_id = (SELECT id FROM printer_statuses WHERE name = :status) + status_id = (SELECT id FROM printer_statuses WHERE name = :status or id = :status) , last_poll_date = datetime('now') WHERE - id = :uid + id = :pid ; ---------------------------------------------------------------------------------------------------- @@ -364,16 +365,10 @@ WHERE INSERT INTO jobs ( user_id , file_id - , priority ) VALUES ( :uid - , :fid, - , ( - SELECT priority + :priority - FROM users - WHERE uid = :uid - ) + , :fid ) RETURNING id @@ -384,7 +379,7 @@ SELECT FROM jobs WHERE user_id = :uid - AND id = :fid + AND id = :jid ; -- name: list-jobs @@ -401,6 +396,7 @@ SELECT FROM jobs WHERE file_id = :fid + , uid = :uid ; -- name: list-job-queue @@ -410,11 +406,9 @@ FROM jobs WHERE finished_at IS NULL AND (:uid IS NULL OR user_id = :uid) -ORDER BY - priority DESC ; --- name: poll-job-queue +-- name: poll-job-queue^ SELECT * FROM jobs @@ -422,8 +416,6 @@ WHERE started_at IS NULL AND finished_at IS NULL AND printer_id IS NULL -ORDER BY - priority DESC LIMIT 1 ; diff --git a/projects/tentacles/src/python/tentacles/templates/macros.html.j2 b/projects/tentacles/src/python/tentacles/templates/macros.html.j2 index 7e76d8f..0b6794a 100644 --- a/projects/tentacles/src/python/tentacles/templates/macros.html.j2 +++ b/projects/tentacles/src/python/tentacles/templates/macros.html.j2 @@ -35,7 +35,7 @@ {% macro job_state(job) %} {{ 'queued' if (not job.finished_at and not job.printer_id and not job.cancelled_at) else 'running' if (not job.finished_at and job.printer_id and not job.cancelled_at) else - 'cancelling' if (not job.finished_at and job.printer_id and job.cancelled_at) else + 'cancelling' if (not job.finished_at and job.cancelled_at) else job.state }} {% endmacro %} diff --git a/projects/tentacles/src/python/tentacles/workers.py b/projects/tentacles/src/python/tentacles/workers.py index 6dafcdd..cdf0da4 100644 --- a/projects/tentacles/src/python/tentacles/workers.py +++ b/projects/tentacles/src/python/tentacles/workers.py @@ -8,11 +8,10 @@ Mostly related to monitoring and managing Printer state. """ from contextlib import closing -from datetime import datetime, timedelta import logging from pathlib import Path from threading import Event -from time import sleep +from typing import Callable from urllib import parse as urlparse from cherrypy.process.plugins import Monitor @@ -25,7 +24,7 @@ from requests.exceptions import ( HTTPError, Timeout, ) -from tentacles.store import Store +from tentacles.db import Db class OctoRest(_OR): @@ -46,36 +45,16 @@ log = logging.getLogger(__name__) SHUTDOWN = Event() -def corn_job(every: timedelta): - def _decorator(f): - def _helper(*args, **kwargs): - last = None - while not SHUTDOWN.is_set(): - if not last or (datetime.now() - last) > every: - log.debug(f"Ticking job {f.__name__}") - try: - last = datetime.now() - f(*args, **kwargs) - except Exception: - log.exception(f"Error while procesing task {f.__name__}") - else: - sleep(1) - - return _helper - - return _decorator - - -def poll_printers(app: App, store: Store) -> None: +def poll_printers(app: App, db: Db) -> None: """Poll printers for their status.""" - for printer in store.list_printers(): - mapped_job = store.fetch_job_by_printer(printer.id) + for printer in db.list_printers(): + mapped_job = db.fetch_job_by_printer(pid=printer.id) def _set_status(status: str): if printer.status != status: - print(f"Printer {printer.id} {printer.status} -> {status}") - store.update_printer_status(printer.id, status) + log.info(f"Printer {printer.id} {printer.status} -> {status}") + db.update_printer_status(pid=printer.id, status=status) try: client = OctoRest(url=printer.url, apikey=printer.api_key) @@ -91,7 +70,7 @@ def poll_printers(app: App, store: Store) -> None: # polling tasks. This violates separation of concerns a bit, # but appears required for correctness. if mapped_job: - store.finish_job(mapped_job.id, "error") + db.finish_job(jid=mapped_job.id, state="error") _set_status("error") @@ -129,24 +108,24 @@ def poll_printers(app: App, store: Store) -> None: ) -def assign_jobs(app: App, store: Store) -> None: +def assign_jobs(app: App, db: Db) -> None: """Assign jobs to printers. Uploading files and job state management is handled separately.""" - for printer_id in store.list_idle_printers(): - if job_id := store.poll_job_queue(): - store.assign_job(job_id, printer_id) - print(f"Mapped job {job_id} to printer {printer_id}") + for printer in db.list_idle_printers(): + if job := db.poll_job_queue(): + db.assign_job(jid=job.id, pid=printer.id) + log.info(f"Mapped job {job.id} to printer {printer.id}") -def push_jobs(app: App, store: Store) -> None: +def push_jobs(app: App, db: Db) -> None: """Ensure that job files are uploaded and started to the assigned printer.""" - for job in store.list_mapped_jobs(): - printer = store.fetch_printer(job.printer_id) - file = store.fetch_file(job.user_id, job.file_id) + for job in db.list_mapped_jobs(): + printer = db.fetch_printer(pid=job.printer_id) + file = db.fetch_file(uid=job.user_id, fid=job.file_id) if not file: log.error(f"Job {job.id} no longer maps to a file") - store.delete_job(job.user_id, job.id) + db.delete_job(job.user_id, job.id) try: client = OctoRest(url=printer.url, apikey=printer.api_key) @@ -157,11 +136,15 @@ def push_jobs(app: App, store: Store) -> None: printer_state = {"error": printer_job.get("error")} if printer_state.get("error"): - print(f"Printer {printer.id} is in error, can't push") + log.warn(f"Printer {printer.id} is in error, can't push") continue try: - client.upload(file.path) + if not client.files_info("local", Path(file.path).name): + client.upload(file.path) + else: + log.info("Don't need to upload the job!") + except HTTPError as e: if e.response.status_code == 409: pass @@ -170,7 +153,7 @@ def push_jobs(app: App, store: Store) -> None: client.select(Path(file.path).name) client.start() - store.start_job(job.id) + db.start_job(job.id) except TimeoutError: pass @@ -178,19 +161,19 @@ def push_jobs(app: App, store: Store) -> None: log.exception("Oop") -def revoke_jobs(app: App, store: Store) -> None: +def revoke_jobs(app: App, db: Db) -> None: """Ensure that job files are uploaded and started to the assigned printer. Note that this will ALSO cancel jobs out of the print queue. """ - for job in store.list_cancelled_jobs(): + for job in db.list_canceling_jobs(): if job.printer_id: - printer = store.fetch_printer(job.printer_id) + printer = db.fetch_printer(pid=job.printer_id) try: - print(f"Cancelling running job {job.id}") + log.info(f"Cancelling running job {job.id}") client = OctoRest(url=printer.url, apikey=printer.api_key) try: client.cancel() @@ -200,8 +183,8 @@ def revoke_jobs(app: App, store: Store) -> None: else: raise - print(f"Job {job.id} -> cancelled") - store.finish_job(job.id, "cancelled") + log.info(f"Job {job.id} -> cancelled") + db.finish_job(jid=job.id, state="cancelled") except TimeoutError: pass @@ -210,15 +193,15 @@ def revoke_jobs(app: App, store: Store) -> None: log.exception("Oop") else: - print(f"Unmapped job {job.id} became cancelled") - store.finish_job(job.id, "cancelled") + log.info(f"Unmapped job {job.id} became cancelled") + db.finish_job(jid=job.id, state="cancelled") -def pull_jobs(app: App, store: Store) -> None: +def pull_jobs(app: App, db: Db) -> None: """Poll the state of mapped printers to control jobs.""" - for job in store.list_running_jobs(): - printer = store.fetch_printer(job.printer_id) + for job in db.list_running_jobs(): + printer = db.fetch_printer(pid=job.printer_id) try: client = OctoRest(url=printer.url, apikey=printer.api_key) job_state = client.job_info() @@ -231,19 +214,19 @@ def pull_jobs(app: App, store: Store) -> None: pass elif job_state.get("progress", {}).get("completion", 0.0) == 100.0: - print(f"Job {job.id} has succeeded") - store.finish_job(job.id, "success") + log.info(f"Job {job.id} has succeeded") + db.finish_job(jid=job.id, state="success") elif printer_state.get("error"): - print(f"Job {job.id} has failed") - store.finish_job(job.id, "failed") + log.warn(f"Job {job.id} has failed") + db.finish_job(jid=job.id, state="failed") elif printer_state.get("cancelling"): - print(f"Job {job.id} has been acknowledged as cancelled") - store.finish_job(job.id, "cancelled") + log.info(f"Job {job.id} has been acknowledged as cancelled") + db.finish_job(jid=job.id, state="cancelled") else: - print( + log.warn( f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}" ) @@ -254,35 +237,60 @@ def pull_jobs(app: App, store: Store) -> None: log.exception("Oop") -def send_emails(app, store: Store): +def send_emails(app, db: Db): with closing( FastMailSMTP( app.config.get("fastmail", {}).get("username"), app.config.get("fastmail", {}).get("key"), ) ) as fm: - for message in store.poll_spool(): + for message in db.poll_email_queue(): fm.send_message( from_addr="root@tirefireind.us", to_addrs=[message.to], subject=message.subject, msg=message.body, ) - store.send_email(message.id) + db.send_email(message.id) + + +def once(f): + val = uninitialized = object() + + def _helper(*args, **kwargs): + nonlocal val + if val is uninitialized: + val = f(*args, **kwargs) + return val + + return _helper + + +def toil(*fs): + def _helper(*args, **kwargs): + for f in fs: + f(*args, **kwargs) + + _helper.__name__ = "toil" + return _helper class Worker(Monitor): - def __init__(self, bus, app, db_factory, **kwargs): + def __init__( + self, + bus, + app: App, + db_factory: Callable[[App], Db], + callback: Callable[[App, Db], None], + **kwargs, + ): self._app = app self._db_factory = db_factory - super().__init__(bus, self.callback, **kwargs) + self._callback = callback + super().__init__( + bus, self.callback, **kwargs, name=f"Async {callback.__name__}" + ) def callback(self): - log.debug("Tick") - with self._app.app_context(), closing(self._db_factory(self._app)) as store: - poll_printers(self._app, store) - assign_jobs(self._app, store) - push_jobs(self._app, store) - revoke_jobs(self._app, store) - pull_jobs(self._app, store) - send_emails(self._app, store) + with closing(self._db_factory(self._app)) as db: + self._callback(self._app, db) diff --git a/projects/tentacles/test/python/conftest.py b/projects/tentacles/test/python/conftest.py index 551f1c3..a44ce2a 100644 --- a/projects/tentacles/test/python/conftest.py +++ b/projects/tentacles/test/python/conftest.py @@ -3,12 +3,12 @@ from datetime import timedelta import pytest -import tentacles.store as s +from tentacles.db import Db @pytest.yield_fixture -def store(): - conn = s.Store(":memory:") +def db(): + conn = Db(":memory:") conn.connect() yield conn conn.close() @@ -25,9 +25,9 @@ def password_testy(): @pytest.fixture -def uid_testy(store: s.Store, username_testy, password_testy): - with store.savepoint(): - return store.try_create_user( +def uid_testy(db: Db, username_testy, password_testy): + with db.savepoint(): + return db.try_create_user( username=username_testy, email=username_testy, password=password_testy, @@ -41,8 +41,10 @@ def login_ttl(): @pytest.fixture -def sid_testy(store, uid_testy, username_testy, password_testy, login_ttl): - with store.savepoint(): - return store.try_login( +def sid_testy(db: Db, uid_testy, username_testy, password_testy, login_ttl): + with db.savepoint(): + res = db.try_login( username=username_testy, password=password_testy, ttl=login_ttl - ).id + ) + assert res.user_id == uid_testy + return res.id diff --git a/projects/tentacles/test/python/test_store.py b/projects/tentacles/test/python/test_store.py index 8702d81..3493188 100644 --- a/projects/tentacles/test/python/test_store.py +++ b/projects/tentacles/test/python/test_store.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 -from tentacles.store import Store +from tentacles.db import Db -def test_store_initializes(store: Store): - assert isinstance(store, Store) +def test_db_initializes(store: Db): + assert isinstance(store, Db) -def test_store_savepoint(store: Store): +def test_db_savepoint(store: Db): obj = store.savepoint() assert hasattr(obj, "__enter__")