Tapping towards a real user flow

This commit is contained in:
Reid 'arrdem' McKenzie 2023-05-30 22:21:51 -06:00
parent e44390a37f
commit 9b5ed98b16
6 changed files with 373 additions and 255 deletions

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python3
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
class FastMailSMTP(smtplib.SMTP_SSL):
"""A wrapper for handling SMTP connections to FastMail."""
def __init__(self, username, password):
super().__init__("mail.messagingengine.com", port=465)
self._username = username
self._password = password
self._has_logged_in = False
def login(self):
if not self._has_logged_in:
super().login(self._username, self._password)
self._has_logged_in = True
def send_message(self, *, from_addr, to_addrs, msg, subject, attachments=None):
self.login()
msg_root = MIMEMultipart()
msg_root["Subject"] = subject
msg_root["From"] = from_addr
msg_root["To"] = ", ".join(to_addrs)
msg_alternative = MIMEMultipart("alternative")
msg_root.attach(msg_alternative)
msg_alternative.attach(MIMEText(msg, "html"))
if attachments:
for attachment in attachments:
prt = MIMEBase("application", "octet-stream")
prt.set_payload(open(attachment, "rb").read())
encoders.encode_base64(prt)
prt.add_header(
"Content-Disposition",
'attachment; filename="%s"' % attachment.replace('"', ""),
)
msg_root.attach(prt)
self.sendmail(from_addr, to_addrs, msg_root.as_string())

View file

@ -68,10 +68,10 @@ def user_session():
): ):
ctx.sid = session_id ctx.sid = session_id
ctx.uid = uid ctx.uid = uid
_id, gid, name, _email, _hash, _status, _verification = ctx.db.fetch_user(uid) user = ctx.db.fetch_user(uid)
ctx.gid = gid ctx.gid = user.group_id
ctx.username = name ctx.username = user.name
ctx.is_admin = gid == 0 ctx.is_admin = user.group_id == 0
@cli.command() @cli.command()
@ -88,7 +88,7 @@ def serve(hostname: str, port: int, config: Path):
# Before first request # Before first request
create_j2_request_global(app) create_j2_request_global(app)
shutdown_event = create_workers(lambda: store_factory(app)) shutdown_event = create_workers(app, store_factory)
# Before request # Before request
app.before_request(user_session) app.before_request(user_session)

View file

@ -31,73 +31,77 @@ def root():
) )
@BLUEPRINT.route("/user/login", methods=["GET", "POST"]) @BLUEPRINT.route("/user/login", methods=["GET"])
def login(): def get_login():
if is_logged_in(): if is_logged_in():
return redirect("/") return redirect("/")
elif request.method == "POST":
if sid := ctx.db.try_login(
username := request.form["username"],
salt(request.form["password"]),
timedelta(days=1),
):
resp = redirect("/")
resp.set_cookie("sid", sid)
flash(f"Welcome, {username}", category="success")
return resp
else:
flash("Incorrect username/password", category="error")
return render_template("login.html.j2")
else: else:
return render_template("login.html.j2") return render_template("login.html.j2")
@BLUEPRINT.route("/user/register", methods=["GET", "POST"]) @BLUEPRINT.route("/user/login", methods=["POST"])
def register(): def post_login():
if sid := ctx.db.try_login(
username := request.form["username"],
salt(request.form["password"]),
timedelta(days=1),
):
resp = redirect("/")
resp.set_cookie("sid", sid)
flash(f"Welcome, {username}", category="success")
return resp
else:
flash("Incorrect username/password", category="error")
return render_template("login.html.j2")
@BLUEPRINT.route("/user/register", methods=["GET"])
def get_register():
if is_logged_in(): if is_logged_in():
return redirect("/") return redirect("/")
elif request.method == "POST":
try:
username = request.form["username"]
email = request.form["email"]
group_id = None
status_id = None
for user_config in current_app.config.get("users", []):
if user_config["email"] == email:
if "group_id" in user_config:
group_id = user_config["group_id"]
if "status_id" in user_config:
status_id = user_config["status_id"]
break
if res := ctx.db.try_create_user(
username, email, salt(request.form["password"]), group_id, status_id
):
id, status = res
if status == -1:
flash(
"Please check your email for a verification request",
category="success",
)
return render_template("register.html.j2")
except Exception as e:
log.exception("Error encountered while registering a user...")
flash("Unable to register that username", category="error")
return render_template("register.html.j2")
else: else:
return render_template("register.html.j2") return render_template("register.html.j2")
@BLUEPRINT.route("/user/register", methods=["POST"])
def post_register():
try:
username = request.form["username"]
email = request.form["email"]
group_id = 1 # Normal users
status_id = -2 # Unverified
for user_config in current_app.config.get("users", []):
if user_config["email"] == email:
if "group_id" in user_config:
group_id = user_config["group_id"]
if "status_id" in user_config:
status_id = user_config["status_id"]
break
if res := ctx.db.try_create_user(
username, email, salt(request.form["password"]), group_id, status_id
):
id, status = res
if status == -1:
flash(
"Please check your email for a verification request",
category="success",
)
return render_template("register.html.j2")
except Exception as e:
log.exception("Error encountered while registering a user...")
flash("Unable to register that username", category="error")
return render_template("register.html.j2")
@BLUEPRINT.route("/user/logout") @BLUEPRINT.route("/user/logout")
def logout(): def logout():
# Invalidate the user's authorization # Invalidate the user's authorization
@ -107,31 +111,35 @@ def logout():
return resp return resp
@BLUEPRINT.route("/user", methods=["GET", "POST"]) @BLUEPRINT.route("/user", methods=["GET"])
def settings(): def get_settings():
if not is_logged_in(): if not is_logged_in():
return redirect("/") return redirect("/")
elif request.method == "POST": return render_template("user.html.j2")
if request.form["action"] == "add":
ttl_spec = request.form.get("ttl")
if ttl_spec == "forever":
ttl = None
elif m := re.fullmatch(r"(\d+)d", ttl_spec):
ttl = timedelta(days=int(m.group(1)))
else:
flash("Bad request", category="error")
return render_template("user.html.j2"), 400
ctx.db.create_key(ctx.sid, ttl, request.form.get("name"))
flash("Key created", category="success")
elif request.form["action"] == "revoke":
ctx.db.delete_key(ctx.uid, request.form.get("id"))
flash("Key revoked", category="success")
@BLUEPRINT.route("/user", methods=["POST"])
def post_settings():
if request.form["action"] == "add":
ttl_spec = request.form.get("ttl")
if ttl_spec == "forever":
ttl = None
elif m := re.fullmatch(r"(\d+)d", ttl_spec):
ttl = timedelta(days=int(m.group(1)))
else: else:
flash("Bad request", category="error") flash("Bad request", category="error")
return render_template("user.html.j2"), 400 return render_template("user.html.j2"), 400
ctx.db.create_key(ctx.sid, ttl, request.form.get("name"))
flash("Key created", category="success")
elif request.form["action"] == "revoke":
ctx.db.delete_key(ctx.uid, request.form.get("id"))
flash("Key revoked", category="success")
else:
flash("Bad request", category="error")
return render_template("user.html.j2"), 400
return render_template("user.html.j2") return render_template("user.html.j2")

View file

@ -25,7 +25,8 @@ CREATE TABLE IF NOT EXISTS users (
, email TEXT , email TEXT
, hash TEXT , hash TEXT
, status_id INTEGER , status_id INTEGER
, verification_id INTEGER , verified_at TEXT
, enabled_at TEXT
, FOREIGN KEY(group_id) REFERENCES groups(id) , FOREIGN KEY(group_id) REFERENCES groups(id)
, FOREIGN KEY(status_id) REFERENCES user_statuses(id) , FOREIGN KEY(status_id) REFERENCES user_statuses(id)
, UNIQUE(name) , UNIQUE(name)
@ -35,7 +36,7 @@ CREATE TABLE IF NOT EXISTS users (
---------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------
-- Keys represent API keys and auth sessions -- Keys represent API keys and auth sessions
CREATE TABLE IF NOT EXISTS user_keys ( CREATE TABLE IF NOT EXISTS user_keys (
id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(48)))) id TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(32))))
, user_id INTEGER , user_id INTEGER
, name TEXT , name TEXT
, expiration TEXT , expiration TEXT

View file

@ -111,17 +111,27 @@ class Store(object):
def fetch_user(self, uid: int): def fetch_user(self, uid: int):
return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone() return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone()
@fmap(one)
@requires_conn
def fetch_user_priority(self, uid: int):
return self._conn.execute(
"SELECT priority FROM groups g INNER JOIN users u ON g.id = u.group_id WHERE u.id = ?",
[uid],
).fetchone()
@requires_conn @requires_conn
def list_users(self): def list_users(self):
return self._conn.execute("SELECT id, name FROM users").fetchall() return self._conn.execute("SELECT * FROM users").fetchall()
@requires_conn
def list_unverified_users(self):
return self._conn.execute(
"SELECT * FROM users WHERE status_id = -2 AND NOT verified_at"
).fetchall()
@requires_conn
def verify_user(self, uid: int):
self._conn.execute(
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
)
@requires_conn
def set_user_status(self, uid: int, status):
self._conn.execute(
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
)
@fmap(one) @fmap(one)
@requires_conn @requires_conn
@ -184,10 +194,13 @@ class Store(object):
@requires_conn @requires_conn
def list_keys(self, uid: int): def list_keys(self, uid: int):
return [(id, name, datetime.fromisoformat(exp) if exp else None) return [
for id, name, exp in self._conn.execute( (id, name, datetime.fromisoformat(exp) if exp else None)
"SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'", [uid] for id, name, exp in self._conn.execute(
).fetchall()] "SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'",
[uid],
).fetchall()
]
@requires_conn @requires_conn
def fetch_key(self, kid) -> tuple: def fetch_key(self, kid) -> tuple:
@ -279,13 +292,16 @@ class Store(object):
@requires_conn @requires_conn
def update_printer_status(self, printer_id, state: str): def update_printer_status(self, printer_id, state: str):
(status_id,) = self._conn.execute(
"SELECT id FROM printer_statuses WHERE name = ?1", [state]
).fetchone()
self._conn.execute( self._conn.execute(
"UPDATE printers SET status_id = ?2, last_poll_date = datetime('now') WHERE id = ?1", """
[printer_id, status_id], UPDATE printers
SET
status_id = (SELECT id FROM printer_statuses WHERE name = ?2)
, last_poll_date = datetime('now')
WHERE
id = ?1
""",
[printer_id, state],
) )
################################################################################ ################################################################################
@ -312,7 +328,9 @@ class Store(object):
@requires_conn @requires_conn
def fetch_file(self, uid: int, fid: int): def fetch_file(self, uid: int, fid: int):
return self._conn.execute("SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]).fetchone() return self._conn.execute(
"SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]
).fetchone()
################################################################################ ################################################################################
# Job # Job
@ -332,8 +350,22 @@ class Store(object):
""" """
assert 0 <= relative_priority <= 9 assert 0 <= relative_priority <= 9
return self._conn.execute( return self._conn.execute(
"INSERT INTO jobs (user_id, file_id, priority) VALUES (?, ?, ?) RETURNING (id)", """
[uid, fid, self.fetch_user_priority(uid) + relative_priority], INSERT INTO jobs (
user_id
, file_id
, priority
) VALUES (
?1
, ?2
, (
SELECT priority + ?3
FROM users
WHERE uid = ?1
)
) RETURNING (id)
""",
[uid, fid, relative_priority],
).fetchone() ).fetchone()
@requires_conn @requires_conn
@ -469,7 +501,8 @@ class Store(object):
@requires_conn @requires_conn
def cancel_job(self, uid: int, job_id: int): def cancel_job(self, uid: int, job_id: int):
return self._conn.execute( return self._conn.execute(
"UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2", [uid, job_id] "UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2",
[uid, job_id],
) )
@requires_conn @requires_conn

View file

@ -23,7 +23,10 @@ from requests.exceptions import (
HTTPError, HTTPError,
Timeout, Timeout,
) )
from flask import Flask as App, current_app, render_template
from tentacles.store import Store from tentacles.store import Store
from fastmail import FastMailSMTP
class OctoRest(_OR): class OctoRest(_OR):
@ -64,189 +67,142 @@ def corn_job(every: timedelta):
return _decorator return _decorator
def poll_printers(db_factory: Callable[[], Store]) -> None: def poll_printers(app: App, store: Store) -> None:
"""Poll printers for their status.""" """Poll printers for their status."""
with closing(db_factory()) as db: for printer in store.list_printers():
for printer in db.list_printers(): mapped_job = store.fetch_job_by_printer(printer.id)
id, name, url, api_key, last_poll, status = printer
mapped_job = db.fetch_job_by_printer(id) def _set_status(status: str):
if printer.status != status:
print(f"Printer {printer.id} {printer.status} -> {status}")
store.update_printer_status(printer.id, status)
try:
client = OctoRest(url=printer.url, apikey=printer.api_key)
printer_job = client.job_info()
try: try:
client = OctoRest(url=url, apikey=api_key) printer_state = client.printer().get("state").get("flags", {})
printer_job = client.job_info() except HTTPError:
try: printer_state = {"disconnected": True}
printer_state = client.printer().get("state").get("flags", {})
except HTTPError:
printer_state = {"disconnected": True}
if printer_state.get("error"): if printer_state.get("error"):
# If there's a mapped job, we manually fail it so that # If there's a mapped job, we manually fail it so that
# there's no possibility of a sync problem between the # there's no possibility of a sync problem between the
# polling tasks. This violates separation of concerns a bit, # polling tasks. This violates separation of concerns a bit,
# but appears required for correctness. # but appears required for correctness.
if mapped_job: if mapped_job:
db.finish_job(mapped_job.id, "error") store.finish_job(mapped_job.id, "error")
if status != "error": _set_status("error")
print(f"Printer {printer.id} -> error")
db.update_printer_status(id, "error")
elif printer_state.get("disconnected"): elif printer_state.get("disconnected"):
if status != "disconnected": _set_status("disconnected")
print(f"Printer {printer.id} -> disconnected")
db.update_printer_status(id, "disconnected")
elif printer_state.get("printing"): elif printer_state.get("printing"):
if status != "running": _set_status("running")
print(f"Printer {printer.id} -> running")
db.update_printer_status(id, "running")
elif printer_job.get("state").lower() == "connecting": elif printer_job.get("state").lower() == "connecting":
if status != "connecting": _set_status("connecting")
print(f"Printer {printer.id} -> connecting")
db.update_printer_status(id, "connecting")
elif printer_state.get("ready"): elif printer_state.get("ready"):
if status != "idle": _set_status("idle")
print(f"Printer {printer.id} -> idle")
db.update_printer_status(id, "idle")
else: else:
raise Exception(f"Indeterminate state {printer_job!r} {printer_state!r}") raise Exception(
f"Indeterminate state {printer_job!r} {printer_state!r}"
)
except (ConnectionError, Timeout): except (ConnectionError, Timeout):
db.update_printer_status(id, "error") _set_status("error")
except HTTPError as e: except HTTPError as e:
assert isinstance(e.response, Response) assert isinstance(e.response, Response)
if e.response.status_code in [403, 401] or "error" in printer_job: if e.response.status_code in [403, 401] or "error" in printer_job:
db.update_printer_status(id, "error") _set_status("error")
elif e.response.json().get("error") == "Printer is not operational":
db.update_printer_status(id, "disconnected") elif e.response.json().get("error") == "Printer is not operational":
else: _set_status("disconnected")
raise Exception(
f"Indeterminate state {e.response.status_code}, {e.response.json()!r}" else:
) raise Exception(
f"Indeterminate state {e.response.status_code}, {e.response.json()!r}"
)
def assign_jobs(db_factory: Callable[[], Store]) -> None: def assign_jobs(app: App, store: Store) -> None:
"""Assign jobs to printers. Uploading files and job state management is handled separately.""" """Assign jobs to printers. Uploading files and job state management is handled separately."""
with closing(db_factory()) as db: for printer_id in store.list_idle_printers():
for printer_id in db.list_idle_printers(): if job_id := store.poll_job_queue():
if job_id := db.poll_job_queue(): store.assign_job(job_id, printer_id)
db.assign_job(job_id, printer_id) print(f"Mapped job {job_id} to printer {printer_id}")
print(f"Mapped job {job_id} to printer {printer_id}")
def push_jobs(db_factory: Callable[[], Store]) -> None: def push_jobs(app: App, store: Store) -> None:
"""Ensure that job files are uploaded and started to the assigned printer.""" """Ensure that job files are uploaded and started to the assigned printer."""
with closing(db_factory()) as db: for job in store.list_mapped_jobs():
for job in db.list_mapped_jobs(): printer = store.fetch_printer(job.printer_id)
printer = db.fetch_printer(job.printer_id) file = store.fetch_file(job.user_id, job.file_id)
file = db.fetch_file(job.user_id, job.file_id) if not file:
if not file: log.error(f"Job {job.id} no longer maps to a file")
log.error(f"Job {job.id} no longer maps to a file") store.delete_job(job.user_id, job.id)
db.delete_job(job.user_id, job.id)
try:
client = OctoRest(url=printer.url, apikey=printer.api_key)
printer_job = client.job_info()
try:
printer_state = client.printer().get("state").get("flags", {})
except HTTPError:
printer_state = {"error": printer_job.get("error")}
if printer_state.get("error"):
print(f"Printer {printer.id} is in error, can't push")
continue
try: try:
client = OctoRest(url=printer.url, apikey=printer.api_key) client.upload(file.path)
printer_job = client.job_info() except HTTPError as e:
try: if e.response.status_code == 409:
printer_state = client.printer().get("state").get("flags", {}) pass
except HTTPError: else:
printer_state = {"error": printer_job.get("error")} raise
if printer_state.get("error"): client.select(Path(file.path).name)
print(f"Printer {printer.id} is in error, can't push") client.start()
continue store.start_job(job.id)
except TimeoutError:
pass
try: except Exception:
client.upload(file.path) log.exception("Oop")
except HTTPError as e:
if e.response.status_code == 409:
pass
else:
raise
client.select(Path(file.path).name)
client.start()
db.start_job(job.id)
except TimeoutError:
pass
except Exception:
log.exception("Oop")
def revoke_jobs(db_factory: Callable[[], Store]) -> None: def revoke_jobs(app: App, store: Store) -> None:
"""Ensure that job files are uploaded and started to the assigned printer. """Ensure that job files are uploaded and started to the assigned printer.
Note that this will ALSO cancel jobs out of the print queue. Note that this will ALSO cancel jobs out of the print queue.
""" """
with closing(db_factory()) as db: for job in store.list_cancelled_jobs():
for job in db.list_cancelled_jobs(): if job.printer_id:
if job.printer_id: printer = store.fetch_printer(job.printer_id)
printer = db.fetch_printer(job.printer_id)
try:
print(f"Cancelling running job {job.id}")
client = OctoRest(url=printer.url, apikey=printer.api_key)
try:
client.cancel()
except HTTPError as e:
if e.response.status_code == 409:
pass
else:
raise
print(f"Job {job.id} -> cancelled")
db.finish_job(job.id, "cancelled")
except TimeoutError:
pass
except Exception:
log.exception("Oop")
else:
print(f"Unmapped job {job.id} became cancelled")
db.finish_job(job.id, "cancelled")
def pull_jobs(db_factory: Callable[[], Store]) -> None:
"""Poll the state of mapped printers to control jobs."""
with closing(db_factory()) as db:
for job in db.list_running_jobs():
printer = db.fetch_printer(job.printer_id)
try: try:
print(f"Cancelling running job {job.id}")
client = OctoRest(url=printer.url, apikey=printer.api_key) client = OctoRest(url=printer.url, apikey=printer.api_key)
job_state = client.job_info()
try: try:
printer_state = client.printer().get("state").get("flags", {}) client.cancel()
except HTTPError: except HTTPError as e:
printer_state = {"disconnected": True, "error": True} if e.response.status_code == 409:
pass
else:
raise
if printer_state.get("printing"): print(f"Job {job.id} -> cancelled")
pass store.finish_job(job.id, "cancelled")
elif job_state.get("progress", {}).get("completion", 0.0) == 100.0:
print(f"Job {job.id} has succeeded")
db.finish_job(job.id, "success")
elif printer_state.get("error"):
print(f"Job {job.id} has failed")
db.finish_job(job.id, "failed")
elif printer_state.get("cancelling"):
print(f"Job {job.id} has been acknowledged as cancelled")
db.finish_job(job.id, "cancelled")
else:
print(f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}")
except TimeoutError: except TimeoutError:
pass pass
@ -254,16 +210,89 @@ def pull_jobs(db_factory: Callable[[], Store]) -> None:
except Exception: except Exception:
log.exception("Oop") log.exception("Oop")
else:
print(f"Unmapped job {job.id} became cancelled")
store.finish_job(job.id, "cancelled")
def pull_jobs(app: App, store: Store) -> None:
"""Poll the state of mapped printers to control jobs."""
for job in store.list_running_jobs():
printer = store.fetch_printer(job.printer_id)
try:
client = OctoRest(url=printer.url, apikey=printer.api_key)
job_state = client.job_info()
try:
printer_state = client.printer().get("state").get("flags", {})
except HTTPError:
printer_state = {"disconnected": True, "error": True}
if printer_state.get("printing"):
pass
elif job_state.get("progress", {}).get("completion", 0.0) == 100.0:
print(f"Job {job.id} has succeeded")
store.finish_job(job.id, "success")
elif printer_state.get("error"):
print(f"Job {job.id} has failed")
store.finish_job(job.id, "failed")
elif printer_state.get("cancelling"):
print(f"Job {job.id} has been acknowledged as cancelled")
store.finish_job(job.id, "cancelled")
else:
print(
f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}"
)
except TimeoutError:
pass
except Exception:
log.exception("Oop")
def send_verifications(app, store: Store):
with closing(
FastMailSMTP(
app.config.get("fastmail", {}).get("username"),
app.config.get("fastmail", {}).get("key"),
)
) as fm:
for user in store.list_unverified_users():
fm.send_message(
from_addr="root@tirefireind.us",
to_addrs=[user.email],
msg=render_template("verification_email.html.j2"),
)
def send_approvals(app, store: Store):
with closing(
FastMailSMTP(
app.config.get("fastmail", {}).get("username"),
app.config.get("fastmail", {}).get("key"),
)
):
pass
@corn_job(timedelta(seconds=5)) @corn_job(timedelta(seconds=5))
def run_worker(db_factory): def run_worker(app, db_factory):
poll_printers(db_factory) with app.app_context(), closing(db_factory(app)) as store:
assign_jobs(db_factory) poll_printers(app, store)
push_jobs(db_factory) assign_jobs(app, store)
revoke_jobs(db_factory) push_jobs(app, store)
pull_jobs(db_factory) revoke_jobs(app, store)
pull_jobs(app, store)
send_verifications(app, store)
send_approvals(app, store)
def create_workers(db_factory: Callable[[], Store]) -> Event:
Thread(target=run_worker, args=[db_factory]).start() def create_workers(app, db_factory: Callable[[], Store]) -> Event:
Thread(target=run_worker, args=[app, db_factory]).start()
return SHUTDOWN return SHUTDOWN