Overhauling atop aiosql

This commit is contained in:
Reid 'arrdem' McKenzie 2023-06-03 13:20:05 -06:00
parent e0b97f5f7a
commit 53c70fd642
4 changed files with 576 additions and 496 deletions

View file

@ -1,3 +1,5 @@
-- name: create_tables#
-- Initialize the core db tables. Arguably migration 0.
---------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------
-- User structures -- User structures
CREATE TABLE IF NOT EXISTS groups ( CREATE TABLE IF NOT EXISTS groups (
@ -16,8 +18,8 @@ CREATE TABLE IF NOT EXISTS user_statuses (
, UNIQUE(name) , UNIQUE(name)
); );
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-3, 'unapproved'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-3, 'unverified');
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unverified'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unapproved');
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled');
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled'); INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled');
@ -113,3 +115,440 @@ CREATE TABLE IF NOT EXISTS email_spool (
, sent_at TEXT , sent_at TEXT
, FOREIGN KEY(user_id) REFERENCES users(id) , FOREIGN KEY(user_id) REFERENCES users(id)
); );
----------------------------------------------------------------------------------------------------
-- Users
----------------------------------------------------------------------------------------------------
-- name: try-create-user^
INSERT INTO users (
name
, email
, hash
, group_id
, status_id
)
VALUES (:name, :email, :hash, :gid, :sid)
RETURNING *
;
-- name: fetch_user^
SELECT
*
FROM users
WHERE
id = :uid
;
-- name: list-users
SELECT
id
, email
FROM users
;
-- name: list-unverified-users
SELECT
*
FROM users
WHERE
status_id = -2
AND verified_at IS NULL
;
-- name: verify-user!
UPDATE users
SET
verified_at = datetime('now')
WHERE
id = :uid
;
-- name: set-user-status!
UPDATE users
SET
status_id = :sid
WHERE
id = :uid
;
----------------------------------------------------------------------------------------------------
-- User statuses
----------------------------------------------------------------------------------------------------
-- name: fetch-user-status^
SELECT
id
, name
FROM user_statuses
WHERE
id = :uid
;
----------------------------------------------------------------------------------------------------
-- Sessions / 'keys'
----------------------------------------------------------------------------------------------------
-- name: create-key^
INSERT INTO user_keys (
user_id
, name
, expiration
)
VALUES (:uid, :name, :expiration)
RETURNING
id
;
-- name: try-login^
SELECT
id
, status_id
FROM users
WHERE
(name = :username AND hash = :hash)
OR (email = :username AND hash = :hash)
LIMIT 1
;
-- name: list-keys
SELECT
id
, name
, expiration
FROM user_keys
WHERE
user_id = :uid
;
-- name: fetch-key^
SELECT
*
FROM user_keys
WHERE
id = :kid
;
-- name: try-key^
SELECT
user_id
FROM user_keys
WHERE
(expiration IS NULL OR unixepoch(expiration) > unixepoch('now'))
AND id = :kid
;
-- name: refresh-key
UPDATE user_keys
SET
expiration = :expiration
WHERE
id = :kid
;
-- name: delete-key
DELETE FROM user_keys
WHERE
user_id = :uid
AND id = :kid
;
----------------------------------------------------------------------------------------------------
-- Printers
----------------------------------------------------------------------------------------------------
-- name: try-create-printer^
INSERT INTO printers (
name
, url
, api_key
, status_id
)
VALUES (:name, :url, :api_key, :status_id)
RETURNING
id
;
-- name: fetch-printer^
SELECT
p.id
, p.name
, p.url
, p.api_key
, p.last_poll_date
, s.name as status
FROM printers p
INNER JOIN printer_statuses s ON p.status_id = s.id
WHERE p.id = :pid
;
-- name: list-printers
SELECT
p.id
, p.name
, p.url
, p.api_key
, p.last_poll_date
, s.name as status
FROM printers p
INNER JOIN printer_statuses s ON p.status_id = s.id
;
-- name: list-idle-printers
SELECT p.id
FROM printers p
LEFT JOIN (SELECT id, printer_id FROM jobs WHERE finished_at IS NULL) j
ON p.id = j.printer_id
INNER JOIN printer_statuses s
ON p.status_id = s.id
WHERE
j.id IS NULL
AND s.name = 'idle'
;
-- name: update-printer-status!
UPDATE printers
SET
status_id = (SELECT id FROM printer_statuses WHERE name = :status)
, last_poll_date = datetime('now')
WHERE
id = :uid
;
----------------------------------------------------------------------------------------------------
-- Files
----------------------------------------------------------------------------------------------------
-- name: create-file^
INSERT INTO files (
user_id
, filename
, path
, upload_date
)
VALUES (:uid, :filename, :path , datetime('now'))
RETURNING
id
;
-- name: list-files
SELECT
*
FROM files
WHERE
user_id = :uid
;
-- name: delete-file!
DELETE FROM files
WHERE
user_id = :uid
AND id = :fid
;
-- name: fetch-file^
SELECT
*
FROM files
WHERE
user_id = :uid
AND id = :fid
;
----------------------------------------------------------------------------------------------------
-- Jobs
----------------------------------------------------------------------------------------------------
-- name: create-job^
INSERT INTO jobs (
user_id
, file_id
, priority
)
VALUES (
:uid
, :fid,
, (
SELECT priority + :priority
FROM users
WHERE uid = :uid
)
)
RETURNING
id
;
-- name: fetch-job^
SELECT
*
FROM jobs
WHERE
user_id = :uid
AND id = :fid
;
-- name: list-jobs
SELECT
*
FROM jobs
WHERE
(:uid IS NULL OR user_id = :uid)
;
-- name: list-jobs-by-file
SELECT
*
FROM jobs
WHERE
file_id = :fid
;
-- name: list-job-queue
SELECT
*
FROM jobs
WHERE
finished_at IS NULL
AND (:uid IS NULL OR user_id = :uid)
ORDER BY
priority DESC
;
-- name: poll-job-queue
SELECT
*
FROM jobs
WHERE
started_at IS NULL
AND finished_at IS NULL
AND printer_id IS NULL
ORDER BY
priority DESC
LIMIT 1
;
-- name: list-job-history
SELECT
*
FROM jobs
WHERE
finished_at IS NOT NULL
AND (:uid IS NULL OR user_id = :uid)
ORDER BY
datetime(finished_at) DESC
;
-- name: list-mapped-jobs
SELECT
*
FROM jobs
WHERE
started_at IS NULL
AND printer_id IS NOT NULL
;
-- name: list-running-jobs
SELECT
*
FROM jobs
WHERE
started_at IS NOT NULL
AND printer_id IS NOT NULL
AND finished_at IS NULL
;
-- name: list-canceling-jobs
SELECT
*
FROM jobs
WHERE
finished_at IS NULL
AND cancelled_at IS NOT NULL
;
-- name: fetch-job-by-printer^
SELECT
*
FROM jobs
WHERE
printer_id = :pid
AND finished_at IS NULL
;
-- name: assign-job!
UPDATE jobs
SET
printer_id = :pid
WHERE
id = :jid
;
-- name: start-job!
UPDATE jobs
SET
started_at = datetime('now')
WHERE
id = :jid
;
-- name: cancel-job!
UPDATE jobs
SET
cancelled_at = datetime('now')
WHERE
user_id = :uid
AND id = :jid
;
-- name: finish-job!
UPDATE jobs
SET
finished_at = datetime('now')
, state = :state
, message = :message
WHERE
id = :jid
;
-- name: delete-job!
DELETE FROM jobs
WHERE
user_id = :uid
AND id = :jid
;
-- name: create-email!
INSERT INTO email_spool (
user_id
, subject
, body
) VALUES (
:uid
, :subject
, :body
)
RETURNING
id
;
-- name: send-email!
UPDATE email_spool
SET
sent_at = datetime('now')
WHERE
id = :eid
;
-- name: poll-email-queue
SELECT
s.id as `id`
, u.email as `to`
, subject
, body
FROM email_spool s
INNER JOIN users u
ON s.user_id = u.id
WHERE
s.sent_at IS NULL
LIMIT 1
;

View file

@ -1,41 +1,53 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from collections import namedtuple from collections import namedtuple
from contextlib import contextmanager
from datetime import datetime, timedelta from datetime import datetime, timedelta
from hashlib import sha3_256 from hashlib import sha3_256
from importlib.resources import files from importlib.resources import files
from inspect import signature
import logging
import sqlite3 import sqlite3
from textwrap import indent from types import GeneratorType, new_class
from typing import Optional from typing import Optional
import aiosql
with files(__package__).joinpath("schema.sql").open("r") as fp: with files(__package__).joinpath("schema.sql").open("r") as fp:
PRELUDE = fp.read() _queries = aiosql.from_str(fp.read(), "sqlite3")
def requires_conn(f): log = logging.getLogger(__name__)
def _helper(self, *args, **kwargs):
if self._conn is None:
raise ConnectionError(f"A connection is required for {f.__name__}")
return f(self, *args, **kwargs)
def qfn(name, f):
def _helper(_self, *args, **kwargs):
res = f(_self._conn, *args, **kwargs)
# Force lazy values for convenience
if isinstance(res, GeneratorType):
res = list(res)
print("%s -> %r" % (name, res))
return res
_helper.__name__ = f.__name__
_helper.__doc__ = f.__doc__
_helper.__signature__ = signature(f)
return _helper return _helper
def fmap(ctor): def make_query_class(cls):
"""Fmap a constructor over the records returned from a query function.""" for k in _queries.available_queries:
cls[k] = qfn(k, getattr(_queries, k))
def _h1(f): cls["__all__"] = _queries.available_queries
def _h2(*args, **kwargs):
res = f(*args, **kwargs)
if isinstance(res, list):
return [ctor(*it) for it in res]
elif isinstance(res, tuple):
return ctor(*res)
return _h2
return _h1 Queries = new_class(
"Queries",
(),
exec_body=make_query_class,
)
def one(it, *args, **kwargs): def one(it, *args, **kwargs):
@ -52,10 +64,11 @@ class LoginError(StoreError):
pass pass
class Store(object): class Store(Queries):
def __init__(self, path): def __init__(self, path):
self._path = path self._path = path
self._conn: sqlite3.Connection = None self._conn: sqlite3.Connection = None
self._cursor: sqlite3.Cursor = None
def _factory(self, cursor, row): def _factory(self, cursor, row):
fields = [column[0] for column in cursor.description] fields = [column[0] for column in cursor.description]
@ -64,99 +77,59 @@ class Store(object):
def connect(self): def connect(self):
if not self._conn: if not self._conn:
self._conn = sqlite3.connect(self._path, isolation_level="IMMEDIATE") self._conn = sqlite3.connect(self._path)
self._conn.row_factory = self._factory self._conn.row_factory = self._factory
for hunk in PRELUDE.split("\n\n"): self._conn.isolation_level = None # Disable automagical transactions
try: self._cursor = self._conn.cursor()
self._conn.executescript(hunk).fetchall() self.create_tables()
except sqlite3.OperationalError as e:
raise RuntimeError( def begin(self):
f"Unable to execute startup script:\n{indent(hunk, ' > ')}" self._conn.execute("BEGIN")
) from e
@requires_conn
def commit(self): def commit(self):
self._conn.commit() if self._conn.in_transaction:
self._conn.execute("COMMIT")
def rollback(self):
self._conn.execute("ROLLBACK")
@property
def cursor(self):
return self._cursor
def savepoint(self):
"""Create an explicit transaction context manager."""
# Inspired by https://discuss.python.org/t/deprecate-the-sqlite3-context-manager-and-or-add-a-savepoint-based-context-manager/16664/9
@contextmanager
def _helper():
try:
self.begin()
yield self
self.commit()
except sqlite3.Error:
self.rollback()
return _helper()
def close(self): def close(self):
if self._conn: if self._cursor:
self.commit() self.commit()
self._cursor.close()
self._conn.close() self._conn.close()
self._conn = None self._conn = self._cursor = None
################################################################################ ################################################################################
# Users # Wrappers for doing Python type mapping
@requires_conn def create_key(self, *, uid: int, name: str, ttl: timedelta):
def try_create_user(self, username, email, password, group_id=10, status_id=-2): return super().create_key(
"""Attempt to create a new user. uid=uid, name=name, expiration=(datetime.now() + ttl).isoformat()
:param username: The name of the user to be created.
:param email: The email of the user to be created.
:param password: The (hopefully salted!) plain text password for the user. Will be hashed before storage.
:param group_id: The numeric ID of a group to assign the user to. Default 10 AKA normal user.
:param status_id: The numeric ID of the status to assign the user to. Default -2 AKA email verification required.
"""
digest = sha3_256()
digest.update(password.encode("utf-8"))
digest = digest.hexdigest()
return self._conn.execute(
"INSERT INTO users (name, email, hash, group_id, status_id) VALUES (?, ?, ?, ?, ?) RETURNING *",
[username, email, digest, group_id, status_id],
).fetchone()
@requires_conn
def fetch_user(self, uid: int):
return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone()
@requires_conn
def list_users(self):
return self._conn.execute("SELECT * FROM users").fetchall()
@requires_conn
def list_unverified_users(self):
return self._conn.execute(
"SELECT * FROM users WHERE status_id = -2 AND NOT verified_at"
).fetchall()
@requires_conn
def verify_user(self, uid: int):
self._conn.execute(
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
) )
@requires_conn def try_login(
def set_user_status(self, uid: int, status): self, *, username: str, password: str, ttl: timedelta
self._conn.execute( ) -> Optional[str]:
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
)
@fmap(one)
@requires_conn
def fetch_user_status(self, user_status_id: int):
"""Fetch a user status by ID"""
return self._conn.execute(
"SELECT id, name FROM user_statuses WHERE id = ?", [user_status_id]
).fetchone()
################################################################################
# Sessions / 'keys'
@fmap(one)
@requires_conn
def _create_session(
self, uid: int, ttl: Optional[timedelta] = None, name: Optional[str] = None
):
return self._conn.execute(
"INSERT INTO user_keys (user_id, name, expiration) VALUES (?1, ?2, ?3) RETURNING (id)",
[uid, name, (datetime.now() + ttl).isoformat() if ttl else None],
).fetchone()
@requires_conn
def try_login(self, username: str, password: str, ttl: timedelta) -> Optional[str]:
"""Given a username and an (unsecured) password, attempt to authenticate the named user. """Given a username and an (unsecured) password, attempt to authenticate the named user.
If successful, return the ID of a new session/key for that user. If successful, return the ID of a new session/key for that user.
@ -165,389 +138,36 @@ class Store(object):
digest = sha3_256() digest = sha3_256()
digest.update(password.encode("utf-8")) digest.update(password.encode("utf-8"))
digest = digest.hexdigest() res = super().try_login(username=username, hash=digest.hexdigest())
res = self._conn.execute(
"SELECT id, status_id FROM users WHERE (name=?1 AND hash=?2) OR (email=?1 AND hash=?2) LIMIT 1",
[username, digest],
).fetchone()
if not res: if not res:
return return
uid, status = res uid, status = res
if status > 0: if status > 0:
return self._create_session(uid, ttl, "web session") return self.create_key(uid=uid, name="web session", ttl=ttl)
else: else:
_, status = self.fetch_user_status(status) _, status = self.fetch_user_status(status)
raise LoginError(status) raise LoginError(status)
@requires_conn def try_create_user(
def create_key(self, kid: str, ttl, name: Optional[str] = None) -> Optional[str]: self, *, username: str, email: str, password: str, gid: int = 1, sid: int = -3
"""Given an _existing_ login session, create a new key. ):
digest = sha3_256()
digest.update(password.encode("utf-8"))
return super().try_create_user(
name=username,
email=email,
hash=digest.hexdigest(),
gid=gid,
sid=sid,
)
This allows the user to create more or less permanent API keys associated with their identity. def refresh_key(self, *, kid: str, ttl: timedelta):
"""
if uid := self.try_key(kid):
return self._create_session(uid, ttl, name)
@requires_conn
def list_keys(self, uid: int):
return [
(id, name, datetime.fromisoformat(exp) if exp else None)
for id, name, exp in self._conn.execute(
"SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'",
[uid],
).fetchall()
]
@requires_conn
def fetch_key(self, kid) -> tuple:
return self._conn.execute(
"SELECT * FROM user_keys WHERE id = ?", [kid]
).fetchone()
@fmap(one)
@requires_conn
def try_key(self, kid: str):
"""Try to find the mapped user for a session."""
return self._conn.execute(
"SELECT user_id FROM user_keys WHERE expiration IS NULL OR unixepoch(expiration) > unixepoch('now') and id = ?",
[kid],
).fetchone()
@requires_conn
def refresh_key(self, kid: str, ttl: timedelta):
"""Automagically renew an API key which is still in use. """Automagically renew an API key which is still in use.
Mostly intended for dealing with web sessions which should implicitly extend, but which use the same mechanism as API keys. Mostly intended for dealing with web sessions which should implicitly extend, but which use the same mechanism as API keys.
""" """
self._conn.execute( super().refresh_key(kid=kid, expiration=(datetime.now() + ttl).isoformat())
"UPDATE user_keys SET expiration = ? WHERE id = ?",
[(datetime.now() + ttl).isoformat(), kid],
)
@requires_conn
def delete_key(self, uid: int, kid: str):
"""Remove a session/key; equivalent to logout."""
self._conn.execute(
"DELETE FROM user_keys WHERE user_id = ?1 and id = ?2", [uid, kid]
)
################################################################################
# Printers
#
# Printers represent connections to OctoPrint instances controlling physical machines.
@fmap(one)
@requires_conn
def try_create_printer(self, name: str, url: str, api_key: str):
self._conn.execute(
"INSERT INTO printers (name, url, api_key, status_id) VALUES (?1, ?2, ?3, 0) RETURNING id",
[name, url, api_key],
).fetchone()
@requires_conn
def fetch_printer(self, printer_id: int):
return self._conn.execute(
"""
SELECT
p.id
, p.name
, p.url
, p.api_key
, p.last_poll_date
, s.name as status
FROM printers p
INNER JOIN printer_statuses s
ON p.status_id = s.id
WHERE p.id = ?1
""",
[printer_id],
).fetchone()
@requires_conn
def list_printers(self):
return self._conn.execute(
"SELECT p.id, p.name, p.url, p.api_key, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
).fetchall()
@fmap(one)
@requires_conn
def list_idle_printers(self):
return self._conn.execute(
"""
SELECT p.id
FROM printers p
LEFT JOIN (SELECT id, printer_id FROM jobs WHERE finished_at IS NULL) j ON p.id = j.printer_id
INNER JOIN printer_statuses s ON p.status_id = s.id
WHERE j.id IS NULL AND s.name = 'idle'
"""
).fetchall()
@requires_conn
def update_printer_status(self, printer_id, state: str):
self._conn.execute(
"""
UPDATE printers
SET
status_id = (SELECT id FROM printer_statuses WHERE name = ?2)
, last_poll_date = datetime('now')
WHERE
id = ?1
""",
[printer_id, state],
)
################################################################################
# Files
#
# A record of local files on disk, and the users who own then.
@fmap(one)
@requires_conn
def create_file(self, uid: int, name: str, path: str) -> int:
return self._conn.execute(
"INSERT INTO files (user_id, filename, path, upload_date) VALUES (?, ?, ?, datetime('now')) RETURNING (id)",
[uid, name, path],
).fetchone()
@requires_conn
def list_files(self, uid: int):
return self._conn.execute(
"SELECT * FROM files WHERE user_id = ?", [uid]
).fetchall()
@requires_conn
def delete_file(self, uid: int, fid: int):
self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid])
@requires_conn
def fetch_file(self, uid: int, fid: int):
return self._conn.execute(
"SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]
).fetchone()
################################################################################
# Job
#
# A request by a user for a given file to be printed.
@fmap(one)
@requires_conn
def create_job(self, uid: int, fid: int, relative_priority: int = 0):
"""Create a job mapped to a user with a file to print and a priority.
Note that the user may provide a sub-priority within their group queue. This allows users to create jobs with
higher priority than existing jobs as a means of controlling the queue order.
May want a different (eg. more explicit) queue ordering mechanism here. Emulating the Netflix queue behavior of
being able to drag some jobs ahead of others? How to model that?
"""
assert 0 <= relative_priority <= 9
return self._conn.execute(
"""
INSERT INTO jobs (
user_id
, file_id
, priority
) VALUES (
?1
, ?2
, (
SELECT priority + ?3
FROM users
WHERE uid = ?1
)
) RETURNING (id)
""",
[uid, fid, relative_priority],
).fetchone()
@requires_conn
def list_jobs(self, uid: Optional[int] = None):
"""Enumerate jobs in priority order."""
cond = f"user_id = {uid}" if uid else "TRUE"
return self._conn.execute(
f"""
SELECT * FROM jobs
WHERE {cond}
""",
[],
).fetchall()
@requires_conn
def list_jobs_by_file(self, fid: int):
return self._conn.execute(
f"""
SELECT * FROM jobs
WHERE file_id = ?1
""",
[fid],
).fetchall()
@requires_conn
def list_job_queue(self, uid: Optional[int] = None):
"""Enumerate jobs in priority order. Note: ignores completed jobs."""
cond = f"user_id = {uid}" if uid else "TRUE"
return self._conn.execute(
f"""
SELECT * FROM jobs
WHERE finished_at IS NULL AND {cond}
ORDER BY priority DESC
""",
[],
).fetchall()
@requires_conn
def list_job_history(self, uid: Optional[int] = None):
"""Enumerate jobs in priority order. Note: ignores completed jobs."""
cond = f"user_id = {uid}" if uid else "TRUE"
return self._conn.execute(
f"""
SELECT * FROM jobs
WHERE finished_at IS NOT NULL AND {cond}
ORDER BY datetime(finished_at) DESC
""",
[],
).fetchall()
@requires_conn
def list_mapped_jobs(self):
"""Scheduler detail. List mapped but not started jobs."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NULL AND printer_id IS NOT NULL
""",
[],
).fetchall()
@requires_conn
def list_running_jobs(self):
"""Scheduler detail. List running jobs.
Note that jobs for which cancellation has been requested but which HAVE
NOT YET BEEN ACKNOWLEDGED AS CANCELLED BY OctoPrint must still be
"running". This prevents the cancelling printer from being rescheduled
prematurely and from allows the job to be moved into the cancelled state
by normal printer status inspection.
"""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE started_at IS NOT NULL
AND printer_id IS NOT NULL
AND finished_at IS NULL
""",
[],
).fetchall()
@requires_conn
def list_cancelled_jobs(self):
"""Scheduler detail. List jobs which have been cancelled but are still 'live' (not finished)."""
return self._conn.execute(
"""
SELECT * FROM jobs
WHERE finished_at IS NULL
AND cancelled_at IS NOT NULL
""",
[],
).fetchall()
@fmap(one)
@requires_conn
def poll_job_queue(self):
return self._conn.execute(
"""
SELECT id
FROM jobs
WHERE started_at IS NULL
AND finished_at IS NULL
AND printer_id IS NULL
ORDER BY priority DESC
LIMIT 1
"""
).fetchone()
@requires_conn
def fetch_job(self, uid: int, jid: int) -> Optional[tuple]:
return self._conn.execute(
"SELECT * FROM jobs WHERE user_id = ? AND id = ?", [uid, jid]
).fetchone()
@requires_conn
def fetch_job_by_printer(self, pid: int) -> Optional[tuple]:
"""Find 'the' mapped incomplete job for a given printer."""
return self._conn.execute(
"SELECT * FROM jobs WHERE printer_id = ? AND finished_at IS NULL", [pid]
).fetchone()
@requires_conn
def assign_job(self, job_id: int, printer_id: int):
return self._conn.execute(
"UPDATE jobs SET printer_id = ?2 WHERE id = ?1", [job_id, printer_id]
)
@requires_conn
def cancel_job(self, uid: int, job_id: int):
return self._conn.execute(
"UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2",
[uid, job_id],
)
@requires_conn
def start_job(self, job_id: int):
return self._conn.execute(
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
)
@requires_conn
def finish_job(self, job_id: int, state: str, message: str = None):
return self._conn.execute(
"UPDATE jobs SET finished_at = datetime('now'), state = ?2, message = ?3 WHERE id = ?1",
[job_id, state, message],
)
@requires_conn
def delete_job(self, uid: int, jid: int):
self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid])
################################################################################
# Emails (notifications)
@requires_conn
def create_email(self, uid: int, subject: str, body: str):
return self._conn.execute(
"INSERT INTO email_spool (user_id, subject, body) VALUES (?1, ?2, ?3) RETURNING id",
[uid, subject, body],
).fetchone()
@requires_conn
def send_email(self, eid: int):
return self._conn.execute(
"UPDATE email_spool SET sent_at = datetime('now') WHERE id = ?1", [eid]
)
@requires_conn
def poll_spool(self, limit: int = 16):
return self._conn.execute(
"""
SELECT s.id as `id`, u.email as `to`, subject, body
FROM email_spool s
INNER JOIN users u
ON s.user_id = u.id
WHERE s.sent_at IS NULL
LIMIT ?1
""",
[limit],
).fetchall()

View file

@ -25,14 +25,14 @@ def password_testy():
@pytest.fixture @pytest.fixture
def uid_testy(store, username_testy, password_testy): def uid_testy(store: s.Store, username_testy, password_testy):
uid, status = store.try_create_user( with store.savepoint():
username_testy, return store.try_create_user(
username_testy, username=username_testy,
password_testy, email=username_testy,
status_id=1, password=password_testy,
) sid=1,
return uid ).id
@pytest.fixture @pytest.fixture
@ -42,4 +42,7 @@ def login_ttl():
@pytest.fixture @pytest.fixture
def sid_testy(store, uid_testy, username_testy, password_testy, login_ttl): def sid_testy(store, uid_testy, username_testy, password_testy, login_ttl):
return store.try_login(username_testy, password_testy, login_ttl) with store.savepoint():
return store.try_login(
username=username_testy, password=password_testy, ttl=login_ttl
).id

View file

@ -7,32 +7,50 @@ def test_store_initializes(store: Store):
assert isinstance(store, Store) assert isinstance(store, Store)
def test_mkuser(store: Store, uid_testy, username_testy): def test_store_savepoint(store: Store):
assert store.list_users() == [(uid_testy, username_testy)] obj = store.savepoint()
assert hasattr(obj, "__enter__")
assert hasattr(obj, "__exit__")
flag = False
with obj:
flag = True
assert flag
def test_mkuser(store: Store, username_testy, password_testy):
res = store.try_create_user(
username=username_testy, email=username_testy, password=password_testy
)
assert res
assert store.list_users() == [(res.id, username_testy)]
def test_mksession(store: Store, uid_testy, username_testy, password_testy, login_ttl): def test_mksession(store: Store, uid_testy, username_testy, password_testy, login_ttl):
sid = store.try_login(username_testy, password_testy, login_ttl) res = store.try_login(
assert sid is not None username=username_testy, password=password_testy, ttl=login_ttl
assert store.list_keys() == [(sid, uid_testy)] )
assert store.try_key(sid) == uid_testy assert res is not None
assert [it.id for it in store.list_keys(uid=uid_testy)] == [res.id]
assert store.try_key(kid=res.id).user_id == uid_testy
def test_refresh_key(store: Store, sid_testy, login_ttl): def test_refresh_key(store: Store, sid_testy, login_ttl):
before = store.fetch_key(sid_testy) before = store.fetch_key(kid=sid_testy)
store.refresh_key(sid_testy, login_ttl * 2) store.refresh_key(kid=sid_testy, ttl=login_ttl * 2)
after = store.fetch_key(sid_testy) after = store.fetch_key(kid=sid_testy)
assert before != after assert before != after
def tets_mkkey(store: Store, sid_testy, uid_testy): def tets_mkkey(store: Store, sid_testy, uid_testy):
assert store.try_key(sid_testy) == uid_testy assert store.try_key(kid=sid_testy) == uid_testy
new_key = store.create_key(sid_testy, None) new_key = store.create_key(kid=sid_testy, ttl=None)
assert new_key is not None assert new_key is not None
assert store.try_key(new_key) == uid_testy assert store.try_key(kid=new_key) == uid_testy
def test_logout(store: Store, sid_testy): def test_logout(store: Store, uid_testy, sid_testy):
assert store.try_key(sid_testy) assert store.try_key(kid=sid_testy)
store.delete_key(sid_testy) store.delete_key(uid=uid_testy, kid=sid_testy)
assert not store.try_key(sid_testy) assert not store.try_key(kid=sid_testy)