Tapping towards a real user flow

This commit is contained in:
Reid 'arrdem' McKenzie 2023-05-30 22:21:51 -06:00
parent a20794503f
commit cb67bdb051
6 changed files with 373 additions and 255 deletions

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python3
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
class FastMailSMTP(smtplib.SMTP_SSL):
"""A wrapper for handling SMTP connections to FastMail."""
def __init__(self, username, password):
super().__init__("mail.messagingengine.com", port=465)
self._username = username
self._password = password
self._has_logged_in = False
def login(self):
if not self._has_logged_in:
super().login(self._username, self._password)
self._has_logged_in = True
def send_message(self, *, from_addr, to_addrs, msg, subject, attachments=None):
self.login()
msg_root = MIMEMultipart()
msg_root["Subject"] = subject
msg_root["From"] = from_addr
msg_root["To"] = ", ".join(to_addrs)
msg_alternative = MIMEMultipart("alternative")
msg_root.attach(msg_alternative)
msg_alternative.attach(MIMEText(msg, "html"))
if attachments:
for attachment in attachments:
prt = MIMEBase("application", "octet-stream")
prt.set_payload(open(attachment, "rb").read())
encoders.encode_base64(prt)
prt.add_header(
"Content-Disposition",
'attachment; filename="%s"' % attachment.replace('"', ""),
)
msg_root.attach(prt)
self.sendmail(from_addr, to_addrs, msg_root.as_string())

View file

@ -68,10 +68,10 @@ def user_session():
): ):
ctx.sid = session_id ctx.sid = session_id
ctx.uid = uid ctx.uid = uid
_id, gid, name, _email, _hash, _status, _verification = ctx.db.fetch_user(uid) user = ctx.db.fetch_user(uid)
ctx.gid = gid ctx.gid = user.group_id
ctx.username = name ctx.username = user.name
ctx.is_admin = gid == 0 ctx.is_admin = user.group_id == 0
@cli.command() @cli.command()
@ -88,7 +88,7 @@ def serve(hostname: str, port: int, config: Path):
# Before first request # Before first request
create_j2_request_global(app) create_j2_request_global(app)
shutdown_event = create_workers(lambda: store_factory(app)) shutdown_event = create_workers(app, store_factory)
# Before request # Before request
app.before_request(user_session) app.before_request(user_session)

View file

@ -31,12 +31,17 @@ def root():
) )
@BLUEPRINT.route("/user/login", methods=["GET", "POST"]) @BLUEPRINT.route("/user/login", methods=["GET"])
def login(): def get_login():
if is_logged_in(): if is_logged_in():
return redirect("/") return redirect("/")
elif request.method == "POST": else:
return render_template("login.html.j2")
@BLUEPRINT.route("/user/login", methods=["POST"])
def post_login():
if sid := ctx.db.try_login( if sid := ctx.db.try_login(
username := request.form["username"], username := request.form["username"],
salt(request.form["password"]), salt(request.form["password"]),
@ -51,21 +56,23 @@ def login():
flash("Incorrect username/password", category="error") flash("Incorrect username/password", category="error")
return render_template("login.html.j2") return render_template("login.html.j2")
else:
return render_template("login.html.j2")
@BLUEPRINT.route("/user/register", methods=["GET"])
@BLUEPRINT.route("/user/register", methods=["GET", "POST"]) def get_register():
def register():
if is_logged_in(): if is_logged_in():
return redirect("/") return redirect("/")
elif request.method == "POST": else:
return render_template("register.html.j2")
@BLUEPRINT.route("/user/register", methods=["POST"])
def post_register():
try: try:
username = request.form["username"] username = request.form["username"]
email = request.form["email"] email = request.form["email"]
group_id = None group_id = 1 # Normal users
status_id = None status_id = -2 # Unverified
for user_config in current_app.config.get("users", []): for user_config in current_app.config.get("users", []):
if user_config["email"] == email: if user_config["email"] == email:
@ -94,9 +101,6 @@ def register():
flash("Unable to register that username", category="error") flash("Unable to register that username", category="error")
return render_template("register.html.j2") return render_template("register.html.j2")
else:
return render_template("register.html.j2")
@BLUEPRINT.route("/user/logout") @BLUEPRINT.route("/user/logout")
def logout(): def logout():
@ -107,12 +111,16 @@ def logout():
return resp return resp
@BLUEPRINT.route("/user", methods=["GET", "POST"]) @BLUEPRINT.route("/user", methods=["GET"])
def settings(): def get_settings():
if not is_logged_in(): if not is_logged_in():
return redirect("/") return redirect("/")
elif request.method == "POST": return render_template("user.html.j2")
@BLUEPRINT.route("/user", methods=["POST"])
def post_settings():
if request.form["action"] == "add": if request.form["action"] == "add":
ttl_spec = request.form.get("ttl") ttl_spec = request.form.get("ttl")
if ttl_spec == "forever": if ttl_spec == "forever":

View file

@ -25,7 +25,8 @@ CREATE TABLE IF NOT EXISTS users (
, email TEXT , email TEXT
, hash TEXT , hash TEXT
, status_id INTEGER , status_id INTEGER
, verification_id INTEGER , verified_at TEXT
, enabled_at TEXT
, FOREIGN KEY(group_id) REFERENCES groups(id) , FOREIGN KEY(group_id) REFERENCES groups(id)
, FOREIGN KEY(status_id) REFERENCES user_statuses(id) , FOREIGN KEY(status_id) REFERENCES user_statuses(id)
, UNIQUE(name) , UNIQUE(name)
@ -35,7 +36,7 @@ CREATE TABLE IF NOT EXISTS users (
---------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------
-- Keys represent API keys and auth sessions -- Keys represent API keys and auth sessions
CREATE TABLE IF NOT EXISTS user_keys ( CREATE TABLE IF NOT EXISTS user_keys (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(48)))) id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(32))))
, user_id INTEGER , user_id INTEGER
, name TEXT , name TEXT
, expiration TEXT , expiration TEXT

View file

@ -111,17 +111,27 @@ class Store(object):
def fetch_user(self, uid: int): def fetch_user(self, uid: int):
return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone() return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone()
@fmap(one)
@requires_conn
def fetch_user_priority(self, uid: int):
return self._conn.execute(
"SELECT priority FROM groups g INNER JOIN users u ON g.id = u.group_id WHERE u.id = ?",
[uid],
).fetchone()
@requires_conn @requires_conn
def list_users(self): def list_users(self):
return self._conn.execute("SELECT id, name FROM users").fetchall() return self._conn.execute("SELECT * FROM users").fetchall()
@requires_conn
def list_unverified_users(self):
return self._conn.execute(
"SELECT * FROM users WHERE status_id = -2 AND NOT verified_at"
).fetchall()
@requires_conn
def verify_user(self, uid: int):
self._conn.execute(
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
)
@requires_conn
def set_user_status(self, uid: int, status):
self._conn.execute(
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
)
@fmap(one) @fmap(one)
@requires_conn @requires_conn
@ -184,10 +194,13 @@ class Store(object):
@requires_conn @requires_conn
def list_keys(self, uid: int): def list_keys(self, uid: int):
return [(id, name, datetime.fromisoformat(exp) if exp else None) return [
(id, name, datetime.fromisoformat(exp) if exp else None)
for id, name, exp in self._conn.execute( for id, name, exp in self._conn.execute(
"SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'", [uid] "SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'",
).fetchall()] [uid],
).fetchall()
]
@requires_conn @requires_conn
def fetch_key(self, kid) -> tuple: def fetch_key(self, kid) -> tuple:
@ -279,13 +292,16 @@ class Store(object):
@requires_conn @requires_conn
def update_printer_status(self, printer_id, state: str): def update_printer_status(self, printer_id, state: str):
(status_id,) = self._conn.execute(
"SELECT id FROM printer_statuses WHERE name = ?1", [state]
).fetchone()
self._conn.execute( self._conn.execute(
"UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1", """
[printer_id, status_id], UPDATE printers
SET
status_id = (SELECT id FROM printer_statuses WHERE name = ?2)
, last_poll_date = datetime('now')
WHERE
id = ?1
""",
[printer_id, state],
) )
################################################################################ ################################################################################
@ -312,7 +328,9 @@ class Store(object):
@requires_conn @requires_conn
def fetch_file(self, uid: int, fid: int): def fetch_file(self, uid: int, fid: int):
return self._conn.execute("SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]).fetchone() return self._conn.execute(
"SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]
).fetchone()
################################################################################ ################################################################################
# Job # Job
@ -332,8 +350,22 @@ class Store(object):
""" """
assert 0 <= relative_priority <= 9 assert 0 <= relative_priority <= 9
return self._conn.execute( return self._conn.execute(
"INSERT INTO jobs (user_id, file_id, priority) VALUES (?, ?, ?) RETURNING (id)", """
[uid, fid, self.fetch_user_priority(uid) + relative_priority], INSERT INTO jobs (
user_id
, file_id
, priority
) VALUES (
?1
, ?2
, (
SELECT priority + ?3
FROM users
WHERE uid = ?1
)
) RETURNING (id)
""",
[uid, fid, relative_priority],
).fetchone() ).fetchone()
@requires_conn @requires_conn
@ -469,7 +501,8 @@ class Store(object):
@requires_conn @requires_conn
def cancel_job(self, uid: int, job_id: int): def cancel_job(self, uid: int, job_id: int):
return self._conn.execute( return self._conn.execute(
"UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2", [uid, job_id] "UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2",
[uid, job_id],
) )
@requires_conn @requires_conn

View file

@ -23,7 +23,10 @@ from requests.exceptions import (
HTTPError, HTTPError,
Timeout, Timeout,
) )
from flask import Flask as App, current_app, render_template
from tentacles.store import Store from tentacles.store import Store
from fastmail import FastMailSMTP
class OctoRest(_OR): class OctoRest(_OR):
@ -64,15 +67,19 @@ def corn_job(every: timedelta):
return _decorator return _decorator
def poll_printers(db_factory: Callable[[], Store]) -> None: def poll_printers(app: App, store: Store) -> None:
"""Poll printers for their status.""" """Poll printers for their status."""
with closing(db_factory()) as db: for printer in store.list_printers():
for printer in db.list_printers(): mapped_job = store.fetch_job_by_printer(printer.id)
id, name, url, api_key, last_poll, status = printer
mapped_job = db.fetch_job_by_printer(id) def _set_status(status: str):
if printer.status != status:
print(f"Printer {printer.id} {printer.status} -> {status}")
store.update_printer_status(printer.id, status)
try: try:
client = OctoRest(url=url, apikey=api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
printer_job = client.job_info() printer_job = client.job_info()
try: try:
printer_state = client.printer().get("state").get("flags", {}) printer_state = client.printer().get("state").get("flags", {})
@ -85,70 +92,62 @@ def poll_printers(db_factory: Callable[[], Store]) -> None:
# polling tasks. This violates separation of concerns a bit, # polling tasks. This violates separation of concerns a bit,
# but appears required for correctness. # but appears required for correctness.
if mapped_job: if mapped_job:
db.finish_job(mapped_job.id, "error") store.finish_job(mapped_job.id, "error")
if status != "error": _set_status("error")
print(f"Printer {printer.id} -> error")
db.update_printer_status(id, "error")
elif printer_state.get("disconnected"): elif printer_state.get("disconnected"):
if status != "disconnected": _set_status("disconnected")
print(f"Printer {printer.id} -> disconnected")
db.update_printer_status(id, "disconnected")
elif printer_state.get("printing"): elif printer_state.get("printing"):
if status != "running": _set_status("running")
print(f"Printer {printer.id} -> running")
db.update_printer_status(id, "running")
elif printer_job.get("state").lower() == "connecting": elif printer_job.get("state").lower() == "connecting":
if status != "connecting": _set_status("connecting")
print(f"Printer {printer.id} -> connecting")
db.update_printer_status(id, "connecting")
elif printer_state.get("ready"): elif printer_state.get("ready"):
if status != "idle": _set_status("idle")
print(f"Printer {printer.id} -> idle")
db.update_printer_status(id, "idle")
else: else:
raise Exception(f"Indeterminate state {printer_job!r} {printer_state!r}") raise Exception(
f"Indeterminate state {printer_job!r} {printer_state!r}"
)
except (ConnectionError, Timeout): except (ConnectionError, Timeout):
db.update_printer_status(id, "error") _set_status("error")
except HTTPError as e: except HTTPError as e:
assert isinstance(e.response, Response) assert isinstance(e.response, Response)
if e.response.status_code in [403, 401] or "error" in printer_job: if e.response.status_code in [403, 401] or "error" in printer_job:
db.update_printer_status(id, "error") _set_status("error")
elif e.response.json().get("error") == "Printer is not operational": elif e.response.json().get("error") == "Printer is not operational":
db.update_printer_status(id, "disconnected") _set_status("disconnected")
else: else:
raise Exception( raise Exception(
f"Indeterminate state {e.response.status_code}, {e.response.json()!r}" f"Indeterminate state {e.response.status_code}, {e.response.json()!r}"
) )
def assign_jobs(db_factory: Callable[[], Store]) -> None: def assign_jobs(app: App, store: Store) -> None:
"""Assign jobs to printers. Uploading files and job state management is handled separately.""" """Assign jobs to printers. Uploading files and job state management is handled separately."""
with closing(db_factory()) as db: for printer_id in store.list_idle_printers():
for printer_id in db.list_idle_printers(): if job_id := store.poll_job_queue():
if job_id := db.poll_job_queue(): store.assign_job(job_id, printer_id)
db.assign_job(job_id, printer_id)
print(f"Mapped job {job_id} to printer {printer_id}") print(f"Mapped job {job_id} to printer {printer_id}")
def push_jobs(db_factory: Callable[[], Store]) -> None: def push_jobs(app: App, store: Store) -> None:
"""Ensure that job files are uploaded and started to the assigned printer.""" """Ensure that job files are uploaded and started to the assigned printer."""
with closing(db_factory()) as db: for job in store.list_mapped_jobs():
for job in db.list_mapped_jobs(): printer = store.fetch_printer(job.printer_id)
printer = db.fetch_printer(job.printer_id) file = store.fetch_file(job.user_id, job.file_id)
file = db.fetch_file(job.user_id, job.file_id)
if not file: if not file:
log.error(f"Job {job.id} no longer maps to a file") log.error(f"Job {job.id} no longer maps to a file")
db.delete_job(job.user_id, job.id) store.delete_job(job.user_id, job.id)
try: try:
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
@ -172,7 +171,7 @@ def push_jobs(db_factory: Callable[[], Store]) -> None:
client.select(Path(file.path).name) client.select(Path(file.path).name)
client.start() client.start()
db.start_job(job.id) store.start_job(job.id)
except TimeoutError: except TimeoutError:
pass pass
@ -180,17 +179,17 @@ def push_jobs(db_factory: Callable[[], Store]) -> None:
log.exception("Oop") log.exception("Oop")
def revoke_jobs(db_factory: Callable[[], Store]) -> None: def revoke_jobs(app: App, store: Store) -> None:
"""Ensure that job files are uploaded and started to the assigned printer. """Ensure that job files are uploaded and started to the assigned printer.
Note that this will ALSO cancel jobs out of the print queue. Note that this will ALSO cancel jobs out of the print queue.
""" """
with closing(db_factory()) as db: for job in store.list_cancelled_jobs():
for job in db.list_cancelled_jobs():
if job.printer_id: if job.printer_id:
printer = db.fetch_printer(job.printer_id) printer = store.fetch_printer(job.printer_id)
try: try:
print(f"Cancelling running job {job.id}") print(f"Cancelling running job {job.id}")
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
@ -203,7 +202,8 @@ def revoke_jobs(db_factory: Callable[[], Store]) -> None:
raise raise
print(f"Job {job.id} -> cancelled") print(f"Job {job.id} -> cancelled")
db.finish_job(job.id, "cancelled") store.finish_job(job.id, "cancelled")
except TimeoutError: except TimeoutError:
pass pass
@ -212,16 +212,14 @@ def revoke_jobs(db_factory: Callable[[], Store]) -> None:
else: else:
print(f"Unmapped job {job.id} became cancelled") print(f"Unmapped job {job.id} became cancelled")
db.finish_job(job.id, "cancelled") store.finish_job(job.id, "cancelled")
def pull_jobs(app: App, store: Store) -> None:
def pull_jobs(db_factory: Callable[[], Store]) -> None:
"""Poll the state of mapped printers to control jobs.""" """Poll the state of mapped printers to control jobs."""
with closing(db_factory()) as db: for job in store.list_running_jobs():
for job in db.list_running_jobs(): printer = store.fetch_printer(job.printer_id)
printer = db.fetch_printer(job.printer_id)
try: try:
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
job_state = client.job_info() job_state = client.job_info()
@ -235,18 +233,20 @@ def pull_jobs(db_factory: Callable[[], Store]) -> None:
elif job_state.get("progress", {}).get("completion", 0.0) == 100.0: elif job_state.get("progress", {}).get("completion", 0.0) == 100.0:
print(f"Job {job.id} has succeeded") print(f"Job {job.id} has succeeded")
db.finish_job(job.id, "success") store.finish_job(job.id, "success")
elif printer_state.get("error"): elif printer_state.get("error"):
print(f"Job {job.id} has failed") print(f"Job {job.id} has failed")
db.finish_job(job.id, "failed") store.finish_job(job.id, "failed")
elif printer_state.get("cancelling"): elif printer_state.get("cancelling"):
print(f"Job {job.id} has been acknowledged as cancelled") print(f"Job {job.id} has been acknowledged as cancelled")
db.finish_job(job.id, "cancelled") store.finish_job(job.id, "cancelled")
else: else:
print(f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}") print(
f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}"
)
except TimeoutError: except TimeoutError:
pass pass
@ -255,15 +255,44 @@ def pull_jobs(db_factory: Callable[[], Store]) -> None:
log.exception("Oop") log.exception("Oop")
@corn_job(timedelta(seconds=5)) def send_verifications(app, store: Store):
def run_worker(db_factory): with closing(
poll_printers(db_factory) FastMailSMTP(
assign_jobs(db_factory) app.config.get("fastmail", {}).get("username"),
push_jobs(db_factory) app.config.get("fastmail", {}).get("key"),
revoke_jobs(db_factory) )
pull_jobs(db_factory) ) as fm:
for user in store.list_unverified_users():
fm.send_message(
from_addr="root@tirefireind.us",
to_addrs=[user.email],
msg=render_template("verification_email.html.j2"),
)
def create_workers(db_factory: Callable[[], Store]) -> Event:
Thread(target=run_worker, args=[db_factory]).start() def send_approvals(app, store: Store):
with closing(
FastMailSMTP(
app.config.get("fastmail", {}).get("username"),
app.config.get("fastmail", {}).get("key"),
)
):
pass
@corn_job(timedelta(seconds=5))
def run_worker(app, db_factory):
with app.app_context(), closing(db_factory(app)) as store:
poll_printers(app, store)
assign_jobs(app, store)
push_jobs(app, store)
revoke_jobs(app, store)
pull_jobs(app, store)
send_verifications(app, store)
send_approvals(app, store)
def create_workers(app, db_factory: Callable[[], Store]) -> Event:
Thread(target=run_worker, args=[app, db_factory]).start()
return SHUTDOWN return SHUTDOWN