From cb67bdb0515c54552a3c0b5776cceeb9f19fd800 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Tue, 30 May 2023 22:21:51 -0600 Subject: [PATCH] Tapping towards a real user flow --- projects/tentacles/src/python/fastmail.py | 47 +++ .../src/python/tentacles/__main__.py | 10 +- .../python/tentacles/blueprints/user_ui.py | 150 ++++---- .../tentacles/src/python/tentacles/schema.sql | 5 +- .../tentacles/src/python/tentacles/store.py | 79 ++-- .../tentacles/src/python/tentacles/workers.py | 337 ++++++++++-------- 6 files changed, 373 insertions(+), 255 deletions(-) create mode 100644 projects/tentacles/src/python/fastmail.py diff --git a/projects/tentacles/src/python/fastmail.py b/projects/tentacles/src/python/fastmail.py new file mode 100644 index 0000000..3a3a329 --- /dev/null +++ b/projects/tentacles/src/python/fastmail.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +from email import encoders +from email.mime.base import MIMEBase +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +import smtplib + + +class FastMailSMTP(smtplib.SMTP_SSL): + """A wrapper for handling SMTP connections to FastMail.""" + + def __init__(self, username, password): + super().__init__("mail.messagingengine.com", port=465) + self._username = username + self._password = password + self._has_logged_in = False + + def login(self): + if not self._has_logged_in: + super().login(self._username, self._password) + self._has_logged_in = True + + def send_message(self, *, from_addr, to_addrs, msg, subject, attachments=None): + self.login() + + msg_root = MIMEMultipart() + msg_root["Subject"] = subject + msg_root["From"] = from_addr + msg_root["To"] = ", ".join(to_addrs) + + msg_alternative = MIMEMultipart("alternative") + msg_root.attach(msg_alternative) + msg_alternative.attach(MIMEText(msg, "html")) + + if attachments: + for attachment in attachments: + prt = MIMEBase("application", "octet-stream") + prt.set_payload(open(attachment, "rb").read()) + encoders.encode_base64(prt) + prt.add_header( + "Content-Disposition", + 'attachment; filename="%s"' % attachment.replace('"', ""), + ) + msg_root.attach(prt) + + self.sendmail(from_addr, to_addrs, msg_root.as_string()) diff --git a/projects/tentacles/src/python/tentacles/__main__.py b/projects/tentacles/src/python/tentacles/__main__.py index e61ad32..30c6bf3 100644 --- a/projects/tentacles/src/python/tentacles/__main__.py +++ b/projects/tentacles/src/python/tentacles/__main__.py @@ -68,10 +68,10 @@ def user_session(): ): ctx.sid = session_id ctx.uid = uid - _id, gid, name, _email, _hash, _status, _verification = ctx.db.fetch_user(uid) - ctx.gid = gid - ctx.username = name - ctx.is_admin = gid == 0 + user = ctx.db.fetch_user(uid) + ctx.gid = user.group_id + ctx.username = user.name + ctx.is_admin = user.group_id == 0 @cli.command() @@ -88,7 +88,7 @@ def serve(hostname: str, port: int, config: Path): # Before first request create_j2_request_global(app) - shutdown_event = create_workers(lambda: store_factory(app)) + shutdown_event = create_workers(app, store_factory) # Before request app.before_request(user_session) diff --git a/projects/tentacles/src/python/tentacles/blueprints/user_ui.py b/projects/tentacles/src/python/tentacles/blueprints/user_ui.py index 2d06204..3280e02 100644 --- a/projects/tentacles/src/python/tentacles/blueprints/user_ui.py +++ b/projects/tentacles/src/python/tentacles/blueprints/user_ui.py @@ -31,73 +31,77 @@ def root(): ) -@BLUEPRINT.route("/user/login", methods=["GET", "POST"]) -def login(): +@BLUEPRINT.route("/user/login", methods=["GET"]) +def get_login(): if is_logged_in(): return redirect("/") - elif request.method == "POST": - if sid := ctx.db.try_login( - username := request.form["username"], - salt(request.form["password"]), - timedelta(days=1), - ): - resp = redirect("/") - resp.set_cookie("sid", sid) - flash(f"Welcome, {username}", category="success") - return resp - - else: - flash("Incorrect username/password", category="error") - return render_template("login.html.j2") - else: return render_template("login.html.j2") -@BLUEPRINT.route("/user/register", methods=["GET", "POST"]) -def register(): +@BLUEPRINT.route("/user/login", methods=["POST"]) +def post_login(): + if sid := ctx.db.try_login( + username := request.form["username"], + salt(request.form["password"]), + timedelta(days=1), + ): + resp = redirect("/") + resp.set_cookie("sid", sid) + flash(f"Welcome, {username}", category="success") + return resp + + else: + flash("Incorrect username/password", category="error") + return render_template("login.html.j2") + + +@BLUEPRINT.route("/user/register", methods=["GET"]) +def get_register(): if is_logged_in(): return redirect("/") - elif request.method == "POST": - try: - username = request.form["username"] - email = request.form["email"] - group_id = None - status_id = None - - for user_config in current_app.config.get("users", []): - if user_config["email"] == email: - if "group_id" in user_config: - group_id = user_config["group_id"] - - if "status_id" in user_config: - status_id = user_config["status_id"] - - break - - if res := ctx.db.try_create_user( - username, email, salt(request.form["password"]), group_id, status_id - ): - id, status = res - if status == -1: - flash( - "Please check your email for a verification request", - category="success", - ) - return render_template("register.html.j2") - - except Exception as e: - log.exception("Error encountered while registering a user...") - - flash("Unable to register that username", category="error") - return render_template("register.html.j2") - else: return render_template("register.html.j2") +@BLUEPRINT.route("/user/register", methods=["POST"]) +def post_register(): + try: + username = request.form["username"] + email = request.form["email"] + group_id = 1 # Normal users + status_id = -2 # Unverified + + for user_config in current_app.config.get("users", []): + if user_config["email"] == email: + if "group_id" in user_config: + group_id = user_config["group_id"] + + if "status_id" in user_config: + status_id = user_config["status_id"] + + break + + if res := ctx.db.try_create_user( + username, email, salt(request.form["password"]), group_id, status_id + ): + id, status = res + if status == -1: + flash( + "Please check your email for a verification request", + category="success", + ) + return render_template("register.html.j2") + + except Exception as e: + log.exception("Error encountered while registering a user...") + + flash("Unable to register that username", category="error") + return render_template("register.html.j2") + + @BLUEPRINT.route("/user/logout") def logout(): # Invalidate the user's authorization @@ -107,31 +111,35 @@ def logout(): return resp -@BLUEPRINT.route("/user", methods=["GET", "POST"]) -def settings(): +@BLUEPRINT.route("/user", methods=["GET"]) +def get_settings(): if not is_logged_in(): return redirect("/") - elif request.method == "POST": - if request.form["action"] == "add": - ttl_spec = request.form.get("ttl") - if ttl_spec == "forever": - ttl = None - elif m := re.fullmatch(r"(\d+)d", ttl_spec): - ttl = timedelta(days=int(m.group(1))) - else: - flash("Bad request", category="error") - return render_template("user.html.j2"), 400 + return render_template("user.html.j2") - ctx.db.create_key(ctx.sid, ttl, request.form.get("name")) - flash("Key created", category="success") - - elif request.form["action"] == "revoke": - ctx.db.delete_key(ctx.uid, request.form.get("id")) - flash("Key revoked", category="success") +@BLUEPRINT.route("/user", methods=["POST"]) +def post_settings(): + if request.form["action"] == "add": + ttl_spec = request.form.get("ttl") + if ttl_spec == "forever": + ttl = None + elif m := re.fullmatch(r"(\d+)d", ttl_spec): + ttl = timedelta(days=int(m.group(1))) else: flash("Bad request", category="error") return render_template("user.html.j2"), 400 + ctx.db.create_key(ctx.sid, ttl, request.form.get("name")) + flash("Key created", category="success") + + elif request.form["action"] == "revoke": + ctx.db.delete_key(ctx.uid, request.form.get("id")) + flash("Key revoked", category="success") + + else: + flash("Bad request", category="error") + return render_template("user.html.j2"), 400 + return render_template("user.html.j2") diff --git a/projects/tentacles/src/python/tentacles/schema.sql b/projects/tentacles/src/python/tentacles/schema.sql index c6c8e9c..c6275f5 100644 --- a/projects/tentacles/src/python/tentacles/schema.sql +++ b/projects/tentacles/src/python/tentacles/schema.sql @@ -25,7 +25,8 @@ CREATE TABLE IF NOT EXISTS users ( , email TEXT , hash TEXT , status_id INTEGER - , verification_id INTEGER + , verified_at TEXT + , enabled_at TEXT , FOREIGN KEY(group_id) REFERENCES groups(id) , FOREIGN KEY(status_id) REFERENCES user_statuses(id) , UNIQUE(name) @@ -35,7 +36,7 @@ CREATE TABLE IF NOT EXISTS users ( ---------------------------------------------------------------------------------------------------- -- Keys represent API keys and auth sessions CREATE TABLE IF NOT EXISTS user_keys ( - id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(48)))) + id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(32)))) , user_id INTEGER , name TEXT , expiration TEXT diff --git a/projects/tentacles/src/python/tentacles/store.py b/projects/tentacles/src/python/tentacles/store.py index d062ae2..ade7a30 100644 --- a/projects/tentacles/src/python/tentacles/store.py +++ b/projects/tentacles/src/python/tentacles/store.py @@ -111,17 +111,27 @@ class Store(object): def fetch_user(self, uid: int): return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone() - @fmap(one) - @requires_conn - def fetch_user_priority(self, uid: int): - return self._conn.execute( - "SELECT priority FROM groups g INNER JOIN users u ON g.id = u.group_id WHERE u.id = ?", - [uid], - ).fetchone() - @requires_conn def list_users(self): - return self._conn.execute("SELECT id, name FROM users").fetchall() + return self._conn.execute("SELECT * FROM users").fetchall() + + @requires_conn + def list_unverified_users(self): + return self._conn.execute( + "SELECT * FROM users WHERE status_id = -2 AND NOT verified_at" + ).fetchall() + + @requires_conn + def verify_user(self, uid: int): + self._conn.execute( + "UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid] + ) + + @requires_conn + def set_user_status(self, uid: int, status): + self._conn.execute( + "UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid] + ) @fmap(one) @requires_conn @@ -184,10 +194,13 @@ class Store(object): @requires_conn def list_keys(self, uid: int): - return [(id, name, datetime.fromisoformat(exp) if exp else None) - for id, name, exp in self._conn.execute( - "SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'", [uid] - ).fetchall()] + return [ + (id, name, datetime.fromisoformat(exp) if exp else None) + for id, name, exp in self._conn.execute( + "SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'", + [uid], + ).fetchall() + ] @requires_conn def fetch_key(self, kid) -> tuple: @@ -279,13 +292,16 @@ class Store(object): @requires_conn def update_printer_status(self, printer_id, state: str): - (status_id,) = self._conn.execute( - "SELECT id FROM printer_statuses WHERE name = ?1", [state] - ).fetchone() - self._conn.execute( - "UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1", - [printer_id, status_id], + """ + UPDATE printers + SET + status_id = (SELECT id FROM printer_statuses WHERE name = ?2) + , last_poll_date = datetime('now') + WHERE + id = ?1 + """, + [printer_id, state], ) ################################################################################ @@ -312,7 +328,9 @@ class Store(object): @requires_conn def fetch_file(self, uid: int, fid: int): - return self._conn.execute("SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]).fetchone() + return self._conn.execute( + "SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid] + ).fetchone() ################################################################################ # Job @@ -332,8 +350,22 @@ class Store(object): """ assert 0 <= relative_priority <= 9 return self._conn.execute( - "INSERT INTO jobs (user_id, file_id, priority) VALUES (?, ?, ?) RETURNING (id)", - [uid, fid, self.fetch_user_priority(uid) + relative_priority], + """ + INSERT INTO jobs ( + user_id + , file_id + , priority + ) VALUES ( + ?1 + , ?2 + , ( + SELECT priority + ?3 + FROM users + WHERE uid = ?1 + ) + ) RETURNING (id) + """, + [uid, fid, relative_priority], ).fetchone() @requires_conn @@ -469,7 +501,8 @@ class Store(object): @requires_conn def cancel_job(self, uid: int, job_id: int): return self._conn.execute( - "UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2", [uid, job_id] + "UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2", + [uid, job_id], ) @requires_conn diff --git a/projects/tentacles/src/python/tentacles/workers.py b/projects/tentacles/src/python/tentacles/workers.py index e6426b7..94cfb82 100644 --- a/projects/tentacles/src/python/tentacles/workers.py +++ b/projects/tentacles/src/python/tentacles/workers.py @@ -23,7 +23,10 @@ from requests.exceptions import ( HTTPError, Timeout, ) +from flask import Flask as App, current_app, render_template + from tentacles.store import Store +from fastmail import FastMailSMTP class OctoRest(_OR): @@ -64,189 +67,142 @@ def corn_job(every: timedelta): return _decorator -def poll_printers(db_factory: Callable[[], Store]) -> None: +def poll_printers(app: App, store: Store) -> None: """Poll printers for their status.""" - with closing(db_factory()) as db: - for printer in db.list_printers(): - id, name, url, api_key, last_poll, status = printer - mapped_job = db.fetch_job_by_printer(id) + for printer in store.list_printers(): + mapped_job = store.fetch_job_by_printer(printer.id) + + def _set_status(status: str): + if printer.status != status: + print(f"Printer {printer.id} {printer.status} -> {status}") + store.update_printer_status(printer.id, status) + + try: + client = OctoRest(url=printer.url, apikey=printer.api_key) + printer_job = client.job_info() try: - client = OctoRest(url=url, apikey=api_key) - printer_job = client.job_info() - try: - printer_state = client.printer().get("state").get("flags", {}) - except HTTPError: - printer_state = {"disconnected": True} + printer_state = client.printer().get("state").get("flags", {}) + except HTTPError: + printer_state = {"disconnected": True} - if printer_state.get("error"): - # If there's a mapped job, we manually fail it so that - # there's no possibility of a sync problem between the - # polling tasks. This violates separation of concerns a bit, - # but appears required for correctness. - if mapped_job: - db.finish_job(mapped_job.id, "error") + if printer_state.get("error"): + # If there's a mapped job, we manually fail it so that + # there's no possibility of a sync problem between the + # polling tasks. This violates separation of concerns a bit, + # but appears required for correctness. + if mapped_job: + store.finish_job(mapped_job.id, "error") - if status != "error": - print(f"Printer {printer.id} -> error") - db.update_printer_status(id, "error") + _set_status("error") - elif printer_state.get("disconnected"): - if status != "disconnected": - print(f"Printer {printer.id} -> disconnected") - db.update_printer_status(id, "disconnected") + elif printer_state.get("disconnected"): + _set_status("disconnected") - elif printer_state.get("printing"): - if status != "running": - print(f"Printer {printer.id} -> running") - db.update_printer_status(id, "running") + elif printer_state.get("printing"): + _set_status("running") - elif printer_job.get("state").lower() == "connecting": - if status != "connecting": - print(f"Printer {printer.id} -> connecting") - db.update_printer_status(id, "connecting") + elif printer_job.get("state").lower() == "connecting": + _set_status("connecting") - elif printer_state.get("ready"): - if status != "idle": - print(f"Printer {printer.id} -> idle") - db.update_printer_status(id, "idle") + elif printer_state.get("ready"): + _set_status("idle") - else: - raise Exception(f"Indeterminate state {printer_job!r} {printer_state!r}") + else: + raise Exception( + f"Indeterminate state {printer_job!r} {printer_state!r}" + ) - except (ConnectionError, Timeout): - db.update_printer_status(id, "error") + except (ConnectionError, Timeout): + _set_status("error") - except HTTPError as e: - assert isinstance(e.response, Response) - if e.response.status_code in [403, 401] or "error" in printer_job: - db.update_printer_status(id, "error") - elif e.response.json().get("error") == "Printer is not operational": - db.update_printer_status(id, "disconnected") - else: - raise Exception( - f"Indeterminate state {e.response.status_code}, {e.response.json()!r}" - ) + except HTTPError as e: + assert isinstance(e.response, Response) + if e.response.status_code in [403, 401] or "error" in printer_job: + _set_status("error") + + elif e.response.json().get("error") == "Printer is not operational": + _set_status("disconnected") + + else: + raise Exception( + f"Indeterminate state {e.response.status_code}, {e.response.json()!r}" + ) -def assign_jobs(db_factory: Callable[[], Store]) -> None: +def assign_jobs(app: App, store: Store) -> None: """Assign jobs to printers. Uploading files and job state management is handled separately.""" - with closing(db_factory()) as db: - for printer_id in db.list_idle_printers(): - if job_id := db.poll_job_queue(): - db.assign_job(job_id, printer_id) - print(f"Mapped job {job_id} to printer {printer_id}") + for printer_id in store.list_idle_printers(): + if job_id := store.poll_job_queue(): + store.assign_job(job_id, printer_id) + print(f"Mapped job {job_id} to printer {printer_id}") -def push_jobs(db_factory: Callable[[], Store]) -> None: +def push_jobs(app: App, store: Store) -> None: """Ensure that job files are uploaded and started to the assigned printer.""" - with closing(db_factory()) as db: - for job in db.list_mapped_jobs(): - printer = db.fetch_printer(job.printer_id) - file = db.fetch_file(job.user_id, job.file_id) - if not file: - log.error(f"Job {job.id} no longer maps to a file") - db.delete_job(job.user_id, job.id) + for job in store.list_mapped_jobs(): + printer = store.fetch_printer(job.printer_id) + file = store.fetch_file(job.user_id, job.file_id) + if not file: + log.error(f"Job {job.id} no longer maps to a file") + store.delete_job(job.user_id, job.id) + + try: + client = OctoRest(url=printer.url, apikey=printer.api_key) + printer_job = client.job_info() + try: + printer_state = client.printer().get("state").get("flags", {}) + except HTTPError: + printer_state = {"error": printer_job.get("error")} + + if printer_state.get("error"): + print(f"Printer {printer.id} is in error, can't push") + continue try: - client = OctoRest(url=printer.url, apikey=printer.api_key) - printer_job = client.job_info() - try: - printer_state = client.printer().get("state").get("flags", {}) - except HTTPError: - printer_state = {"error": printer_job.get("error")} + client.upload(file.path) + except HTTPError as e: + if e.response.status_code == 409: + pass + else: + raise - if printer_state.get("error"): - print(f"Printer {printer.id} is in error, can't push") - continue + client.select(Path(file.path).name) + client.start() + store.start_job(job.id) + except TimeoutError: + pass - try: - client.upload(file.path) - except HTTPError as e: - if e.response.status_code == 409: - pass - else: - raise - - client.select(Path(file.path).name) - client.start() - db.start_job(job.id) - except TimeoutError: - pass - - except Exception: - log.exception("Oop") + except Exception: + log.exception("Oop") -def revoke_jobs(db_factory: Callable[[], Store]) -> None: +def revoke_jobs(app: App, store: Store) -> None: """Ensure that job files are uploaded and started to the assigned printer. Note that this will ALSO cancel jobs out of the print queue. """ - with closing(db_factory()) as db: - for job in db.list_cancelled_jobs(): - if job.printer_id: - printer = db.fetch_printer(job.printer_id) - try: - print(f"Cancelling running job {job.id}") - client = OctoRest(url=printer.url, apikey=printer.api_key) - try: - client.cancel() - except HTTPError as e: - if e.response.status_code == 409: - pass - else: - raise + for job in store.list_cancelled_jobs(): + if job.printer_id: + printer = store.fetch_printer(job.printer_id) - print(f"Job {job.id} -> cancelled") - db.finish_job(job.id, "cancelled") - except TimeoutError: - pass - - except Exception: - log.exception("Oop") - - else: - print(f"Unmapped job {job.id} became cancelled") - db.finish_job(job.id, "cancelled") - - - -def pull_jobs(db_factory: Callable[[], Store]) -> None: - """Poll the state of mapped printers to control jobs.""" - - with closing(db_factory()) as db: - for job in db.list_running_jobs(): - printer = db.fetch_printer(job.printer_id) try: + print(f"Cancelling running job {job.id}") client = OctoRest(url=printer.url, apikey=printer.api_key) - job_state = client.job_info() try: - printer_state = client.printer().get("state").get("flags", {}) - except HTTPError: - printer_state = {"disconnected": True, "error": True} + client.cancel() + except HTTPError as e: + if e.response.status_code == 409: + pass + else: + raise - if printer_state.get("printing"): - pass - - elif job_state.get("progress", {}).get("completion", 0.0) == 100.0: - print(f"Job {job.id} has succeeded") - db.finish_job(job.id, "success") - - elif printer_state.get("error"): - print(f"Job {job.id} has failed") - db.finish_job(job.id, "failed") - - elif printer_state.get("cancelling"): - print(f"Job {job.id} has been acknowledged as cancelled") - db.finish_job(job.id, "cancelled") - - else: - print(f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}") + print(f"Job {job.id} -> cancelled") + store.finish_job(job.id, "cancelled") except TimeoutError: pass @@ -254,16 +210,89 @@ def pull_jobs(db_factory: Callable[[], Store]) -> None: except Exception: log.exception("Oop") + else: + print(f"Unmapped job {job.id} became cancelled") + store.finish_job(job.id, "cancelled") + + +def pull_jobs(app: App, store: Store) -> None: + """Poll the state of mapped printers to control jobs.""" + + for job in store.list_running_jobs(): + printer = store.fetch_printer(job.printer_id) + try: + client = OctoRest(url=printer.url, apikey=printer.api_key) + job_state = client.job_info() + try: + printer_state = client.printer().get("state").get("flags", {}) + except HTTPError: + printer_state = {"disconnected": True, "error": True} + + if printer_state.get("printing"): + pass + + elif job_state.get("progress", {}).get("completion", 0.0) == 100.0: + print(f"Job {job.id} has succeeded") + store.finish_job(job.id, "success") + + elif printer_state.get("error"): + print(f"Job {job.id} has failed") + store.finish_job(job.id, "failed") + + elif printer_state.get("cancelling"): + print(f"Job {job.id} has been acknowledged as cancelled") + store.finish_job(job.id, "cancelled") + + else: + print( + f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}" + ) + + except TimeoutError: + pass + + except Exception: + log.exception("Oop") + + +def send_verifications(app, store: Store): + with closing( + FastMailSMTP( + app.config.get("fastmail", {}).get("username"), + app.config.get("fastmail", {}).get("key"), + ) + ) as fm: + for user in store.list_unverified_users(): + fm.send_message( + from_addr="root@tirefireind.us", + to_addrs=[user.email], + msg=render_template("verification_email.html.j2"), + ) + + +def send_approvals(app, store: Store): + with closing( + FastMailSMTP( + app.config.get("fastmail", {}).get("username"), + app.config.get("fastmail", {}).get("key"), + ) + ): + pass + @corn_job(timedelta(seconds=5)) -def run_worker(db_factory): - poll_printers(db_factory) - assign_jobs(db_factory) - push_jobs(db_factory) - revoke_jobs(db_factory) - pull_jobs(db_factory) +def run_worker(app, db_factory): + with app.app_context(), closing(db_factory(app)) as store: + poll_printers(app, store) + assign_jobs(app, store) + push_jobs(app, store) + revoke_jobs(app, store) + pull_jobs(app, store) + send_verifications(app, store) + send_approvals(app, store) -def create_workers(db_factory: Callable[[], Store]) -> Event: - Thread(target=run_worker, args=[db_factory]).start() + +def create_workers(app, db_factory: Callable[[], Store]) -> Event: + Thread(target=run_worker, args=[app, db_factory]).start() return SHUTDOWN