Overhauling atop aiosql
This commit is contained in:
parent
53cdd5ca14
commit
413825d8c7
4 changed files with 576 additions and 496 deletions
|
@ -1,3 +1,5 @@
|
|||
-- name: create_tables#
|
||||
-- Initialize the core db tables. Arguably migration 0.
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- User structures
|
||||
CREATE TABLE IF NOT EXISTS groups (
|
||||
|
@ -16,8 +18,8 @@ CREATE TABLE IF NOT EXISTS user_statuses (
|
|||
, UNIQUE(name)
|
||||
);
|
||||
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-3, 'unapproved');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unverified');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-3, 'unverified');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-2, 'unapproved');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (-1, 'disabled');
|
||||
INSERT OR IGNORE INTO user_statuses (id, name) VALUES (1, 'enabled');
|
||||
|
||||
|
@ -113,3 +115,440 @@ CREATE TABLE IF NOT EXISTS email_spool (
|
|||
, sent_at TEXT
|
||||
, FOREIGN KEY(user_id) REFERENCES users(id)
|
||||
);
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- Users
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
-- name: try-create-user^
|
||||
INSERT INTO users (
|
||||
name
|
||||
, email
|
||||
, hash
|
||||
, group_id
|
||||
, status_id
|
||||
)
|
||||
VALUES (:name, :email, :hash, :gid, :sid)
|
||||
RETURNING *
|
||||
;
|
||||
|
||||
-- name: fetch_user^
|
||||
SELECT
|
||||
*
|
||||
FROM users
|
||||
WHERE
|
||||
id = :uid
|
||||
;
|
||||
|
||||
-- name: list-users
|
||||
SELECT
|
||||
id
|
||||
, email
|
||||
FROM users
|
||||
;
|
||||
|
||||
-- name: list-unverified-users
|
||||
SELECT
|
||||
*
|
||||
FROM users
|
||||
WHERE
|
||||
status_id = -2
|
||||
AND verified_at IS NULL
|
||||
;
|
||||
|
||||
-- name: verify-user!
|
||||
UPDATE users
|
||||
SET
|
||||
verified_at = datetime('now')
|
||||
WHERE
|
||||
id = :uid
|
||||
;
|
||||
|
||||
|
||||
-- name: set-user-status!
|
||||
UPDATE users
|
||||
SET
|
||||
status_id = :sid
|
||||
WHERE
|
||||
id = :uid
|
||||
;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- User statuses
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
-- name: fetch-user-status^
|
||||
SELECT
|
||||
id
|
||||
, name
|
||||
FROM user_statuses
|
||||
WHERE
|
||||
id = :uid
|
||||
;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- Sessions / 'keys'
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
-- name: create-key^
|
||||
INSERT INTO user_keys (
|
||||
user_id
|
||||
, name
|
||||
, expiration
|
||||
)
|
||||
VALUES (:uid, :name, :expiration)
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
|
||||
-- name: try-login^
|
||||
SELECT
|
||||
id
|
||||
, status_id
|
||||
FROM users
|
||||
WHERE
|
||||
(name = :username AND hash = :hash)
|
||||
OR (email = :username AND hash = :hash)
|
||||
LIMIT 1
|
||||
;
|
||||
|
||||
-- name: list-keys
|
||||
SELECT
|
||||
id
|
||||
, name
|
||||
, expiration
|
||||
FROM user_keys
|
||||
WHERE
|
||||
user_id = :uid
|
||||
;
|
||||
|
||||
-- name: fetch-key^
|
||||
SELECT
|
||||
*
|
||||
FROM user_keys
|
||||
WHERE
|
||||
id = :kid
|
||||
;
|
||||
|
||||
-- name: try-key^
|
||||
SELECT
|
||||
user_id
|
||||
FROM user_keys
|
||||
WHERE
|
||||
(expiration IS NULL OR unixepoch(expiration) > unixepoch('now'))
|
||||
AND id = :kid
|
||||
;
|
||||
|
||||
-- name: refresh-key
|
||||
UPDATE user_keys
|
||||
SET
|
||||
expiration = :expiration
|
||||
WHERE
|
||||
id = :kid
|
||||
;
|
||||
|
||||
-- name: delete-key
|
||||
DELETE FROM user_keys
|
||||
WHERE
|
||||
user_id = :uid
|
||||
AND id = :kid
|
||||
;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- Printers
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
-- name: try-create-printer^
|
||||
INSERT INTO printers (
|
||||
name
|
||||
, url
|
||||
, api_key
|
||||
, status_id
|
||||
)
|
||||
VALUES (:name, :url, :api_key, :status_id)
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
|
||||
-- name: fetch-printer^
|
||||
SELECT
|
||||
p.id
|
||||
, p.name
|
||||
, p.url
|
||||
, p.api_key
|
||||
, p.last_poll_date
|
||||
, s.name as status
|
||||
FROM printers p
|
||||
INNER JOIN printer_statuses s ON p.status_id = s.id
|
||||
WHERE p.id = :pid
|
||||
;
|
||||
|
||||
-- name: list-printers
|
||||
SELECT
|
||||
p.id
|
||||
, p.name
|
||||
, p.url
|
||||
, p.api_key
|
||||
, p.last_poll_date
|
||||
, s.name as status
|
||||
FROM printers p
|
||||
INNER JOIN printer_statuses s ON p.status_id = s.id
|
||||
;
|
||||
|
||||
-- name: list-idle-printers
|
||||
SELECT p.id
|
||||
FROM printers p
|
||||
LEFT JOIN (SELECT id, printer_id FROM jobs WHERE finished_at IS NULL) j
|
||||
ON p.id = j.printer_id
|
||||
INNER JOIN printer_statuses s
|
||||
ON p.status_id = s.id
|
||||
WHERE
|
||||
j.id IS NULL
|
||||
AND s.name = 'idle'
|
||||
;
|
||||
|
||||
-- name: update-printer-status!
|
||||
UPDATE printers
|
||||
SET
|
||||
status_id = (SELECT id FROM printer_statuses WHERE name = :status)
|
||||
, last_poll_date = datetime('now')
|
||||
WHERE
|
||||
id = :uid
|
||||
;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- Files
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
-- name: create-file^
|
||||
INSERT INTO files (
|
||||
user_id
|
||||
, filename
|
||||
, path
|
||||
, upload_date
|
||||
)
|
||||
VALUES (:uid, :filename, :path , datetime('now'))
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
|
||||
-- name: list-files
|
||||
SELECT
|
||||
*
|
||||
FROM files
|
||||
WHERE
|
||||
user_id = :uid
|
||||
;
|
||||
|
||||
-- name: delete-file!
|
||||
DELETE FROM files
|
||||
WHERE
|
||||
user_id = :uid
|
||||
AND id = :fid
|
||||
;
|
||||
|
||||
-- name: fetch-file^
|
||||
SELECT
|
||||
*
|
||||
FROM files
|
||||
WHERE
|
||||
user_id = :uid
|
||||
AND id = :fid
|
||||
;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- Jobs
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
-- name: create-job^
|
||||
INSERT INTO jobs (
|
||||
user_id
|
||||
, file_id
|
||||
, priority
|
||||
)
|
||||
VALUES (
|
||||
:uid
|
||||
, :fid,
|
||||
, (
|
||||
SELECT priority + :priority
|
||||
FROM users
|
||||
WHERE uid = :uid
|
||||
)
|
||||
)
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
-- name: fetch-job^
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
user_id = :uid
|
||||
AND id = :fid
|
||||
;
|
||||
|
||||
-- name: list-jobs
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
(:uid IS NULL OR user_id = :uid)
|
||||
;
|
||||
|
||||
-- name: list-jobs-by-file
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
file_id = :fid
|
||||
;
|
||||
|
||||
-- name: list-job-queue
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
finished_at IS NULL
|
||||
AND (:uid IS NULL OR user_id = :uid)
|
||||
ORDER BY
|
||||
priority DESC
|
||||
;
|
||||
|
||||
-- name: poll-job-queue
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
started_at IS NULL
|
||||
AND finished_at IS NULL
|
||||
AND printer_id IS NULL
|
||||
ORDER BY
|
||||
priority DESC
|
||||
LIMIT 1
|
||||
;
|
||||
|
||||
-- name: list-job-history
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
finished_at IS NOT NULL
|
||||
AND (:uid IS NULL OR user_id = :uid)
|
||||
ORDER BY
|
||||
datetime(finished_at) DESC
|
||||
;
|
||||
|
||||
-- name: list-mapped-jobs
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
started_at IS NULL
|
||||
AND printer_id IS NOT NULL
|
||||
;
|
||||
|
||||
-- name: list-running-jobs
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
started_at IS NOT NULL
|
||||
AND printer_id IS NOT NULL
|
||||
AND finished_at IS NULL
|
||||
;
|
||||
|
||||
-- name: list-canceling-jobs
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
finished_at IS NULL
|
||||
AND cancelled_at IS NOT NULL
|
||||
;
|
||||
|
||||
-- name: fetch-job-by-printer^
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
WHERE
|
||||
printer_id = :pid
|
||||
AND finished_at IS NULL
|
||||
;
|
||||
|
||||
-- name: assign-job!
|
||||
UPDATE jobs
|
||||
SET
|
||||
printer_id = :pid
|
||||
WHERE
|
||||
id = :jid
|
||||
;
|
||||
|
||||
-- name: start-job!
|
||||
UPDATE jobs
|
||||
SET
|
||||
started_at = datetime('now')
|
||||
WHERE
|
||||
id = :jid
|
||||
;
|
||||
|
||||
-- name: cancel-job!
|
||||
UPDATE jobs
|
||||
SET
|
||||
cancelled_at = datetime('now')
|
||||
WHERE
|
||||
user_id = :uid
|
||||
AND id = :jid
|
||||
;
|
||||
|
||||
-- name: finish-job!
|
||||
UPDATE jobs
|
||||
SET
|
||||
finished_at = datetime('now')
|
||||
, state = :state
|
||||
, message = :message
|
||||
WHERE
|
||||
id = :jid
|
||||
;
|
||||
|
||||
-- name: delete-job!
|
||||
DELETE FROM jobs
|
||||
WHERE
|
||||
user_id = :uid
|
||||
AND id = :jid
|
||||
;
|
||||
|
||||
-- name: create-email!
|
||||
INSERT INTO email_spool (
|
||||
user_id
|
||||
, subject
|
||||
, body
|
||||
) VALUES (
|
||||
:uid
|
||||
, :subject
|
||||
, :body
|
||||
)
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
|
||||
-- name: send-email!
|
||||
UPDATE email_spool
|
||||
SET
|
||||
sent_at = datetime('now')
|
||||
WHERE
|
||||
id = :eid
|
||||
;
|
||||
|
||||
-- name: poll-email-queue
|
||||
SELECT
|
||||
s.id as `id`
|
||||
, u.email as `to`
|
||||
, subject
|
||||
, body
|
||||
FROM email_spool s
|
||||
INNER JOIN users u
|
||||
ON s.user_id = u.id
|
||||
WHERE
|
||||
s.sent_at IS NULL
|
||||
LIMIT 1
|
||||
;
|
||||
|
|
|
@ -1,41 +1,53 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from collections import namedtuple
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timedelta
|
||||
from hashlib import sha3_256
|
||||
from importlib.resources import files
|
||||
from inspect import signature
|
||||
import logging
|
||||
import sqlite3
|
||||
from textwrap import indent
|
||||
from types import GeneratorType, new_class
|
||||
from typing import Optional
|
||||
|
||||
import aiosql
|
||||
|
||||
|
||||
with files(__package__).joinpath("schema.sql").open("r") as fp:
|
||||
PRELUDE = fp.read()
|
||||
_queries = aiosql.from_str(fp.read(), "sqlite3")
|
||||
|
||||
|
||||
def requires_conn(f):
|
||||
def _helper(self, *args, **kwargs):
|
||||
if self._conn is None:
|
||||
raise ConnectionError(f"A connection is required for {f.__name__}")
|
||||
return f(self, *args, **kwargs)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def qfn(name, f):
|
||||
def _helper(_self, *args, **kwargs):
|
||||
res = f(_self._conn, *args, **kwargs)
|
||||
# Force lazy values for convenience
|
||||
if isinstance(res, GeneratorType):
|
||||
res = list(res)
|
||||
print("%s -> %r" % (name, res))
|
||||
return res
|
||||
|
||||
_helper.__name__ = f.__name__
|
||||
_helper.__doc__ = f.__doc__
|
||||
_helper.__signature__ = signature(f)
|
||||
return _helper
|
||||
|
||||
|
||||
def fmap(ctor):
|
||||
"""Fmap a constructor over the records returned from a query function."""
|
||||
def make_query_class(cls):
|
||||
for k in _queries.available_queries:
|
||||
cls[k] = qfn(k, getattr(_queries, k))
|
||||
|
||||
def _h1(f):
|
||||
def _h2(*args, **kwargs):
|
||||
res = f(*args, **kwargs)
|
||||
if isinstance(res, list):
|
||||
return [ctor(*it) for it in res]
|
||||
elif isinstance(res, tuple):
|
||||
return ctor(*res)
|
||||
cls["__all__"] = _queries.available_queries
|
||||
|
||||
return _h2
|
||||
|
||||
return _h1
|
||||
Queries = new_class(
|
||||
"Queries",
|
||||
(),
|
||||
exec_body=make_query_class,
|
||||
)
|
||||
|
||||
|
||||
def one(it, *args, **kwargs):
|
||||
|
@ -52,10 +64,11 @@ class LoginError(StoreError):
|
|||
pass
|
||||
|
||||
|
||||
class Store(object):
|
||||
class Store(Queries):
|
||||
def __init__(self, path):
|
||||
self._path = path
|
||||
self._conn: sqlite3.Connection = None
|
||||
self._cursor: sqlite3.Cursor = None
|
||||
|
||||
def _factory(self, cursor, row):
|
||||
fields = [column[0] for column in cursor.description]
|
||||
|
@ -64,99 +77,59 @@ class Store(object):
|
|||
|
||||
def connect(self):
|
||||
if not self._conn:
|
||||
self._conn = sqlite3.connect(self._path, isolation_level="IMMEDIATE")
|
||||
self._conn = sqlite3.connect(self._path)
|
||||
self._conn.row_factory = self._factory
|
||||
for hunk in PRELUDE.split("\n\n"):
|
||||
try:
|
||||
self._conn.executescript(hunk).fetchall()
|
||||
except sqlite3.OperationalError as e:
|
||||
raise RuntimeError(
|
||||
f"Unable to execute startup script:\n{indent(hunk, ' > ')}"
|
||||
) from e
|
||||
self._conn.isolation_level = None # Disable automagical transactions
|
||||
self._cursor = self._conn.cursor()
|
||||
self.create_tables()
|
||||
|
||||
def begin(self):
|
||||
self._conn.execute("BEGIN")
|
||||
|
||||
@requires_conn
|
||||
def commit(self):
|
||||
self._conn.commit()
|
||||
if self._conn.in_transaction:
|
||||
self._conn.execute("COMMIT")
|
||||
|
||||
def rollback(self):
|
||||
self._conn.execute("ROLLBACK")
|
||||
|
||||
@property
|
||||
def cursor(self):
|
||||
return self._cursor
|
||||
|
||||
def savepoint(self):
|
||||
"""Create an explicit transaction context manager."""
|
||||
# Inspired by https://discuss.python.org/t/deprecate-the-sqlite3-context-manager-and-or-add-a-savepoint-based-context-manager/16664/9
|
||||
|
||||
@contextmanager
|
||||
def _helper():
|
||||
try:
|
||||
self.begin()
|
||||
yield self
|
||||
self.commit()
|
||||
except sqlite3.Error:
|
||||
self.rollback()
|
||||
|
||||
return _helper()
|
||||
|
||||
def close(self):
|
||||
if self._conn:
|
||||
if self._cursor:
|
||||
self.commit()
|
||||
self._cursor.close()
|
||||
self._conn.close()
|
||||
self._conn = None
|
||||
self._conn = self._cursor = None
|
||||
|
||||
################################################################################
|
||||
# Users
|
||||
# Wrappers for doing Python type mapping
|
||||
|
||||
@requires_conn
|
||||
def try_create_user(self, username, email, password, group_id=10, status_id=-2):
|
||||
"""Attempt to create a new user.
|
||||
|
||||
:param username: The name of the user to be created.
|
||||
:param email: The email of the user to be created.
|
||||
:param password: The (hopefully salted!) plain text password for the user. Will be hashed before storage.
|
||||
:param group_id: The numeric ID of a group to assign the user to. Default 10 AKA normal user.
|
||||
:param status_id: The numeric ID of the status to assign the user to. Default -2 AKA email verification required.
|
||||
|
||||
"""
|
||||
|
||||
digest = sha3_256()
|
||||
digest.update(password.encode("utf-8"))
|
||||
digest = digest.hexdigest()
|
||||
return self._conn.execute(
|
||||
"INSERT INTO users (name, email, hash, group_id, status_id) VALUES (?, ?, ?, ?, ?) RETURNING *",
|
||||
[username, email, digest, group_id, status_id],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def fetch_user(self, uid: int):
|
||||
return self._conn.execute("SELECT * FROM users WHERE id = ?", [uid]).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def list_users(self):
|
||||
return self._conn.execute("SELECT * FROM users").fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_unverified_users(self):
|
||||
return self._conn.execute(
|
||||
"SELECT * FROM users WHERE status_id = -2 AND NOT verified_at"
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def verify_user(self, uid: int):
|
||||
self._conn.execute(
|
||||
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
|
||||
def create_key(self, *, uid: int, name: str, ttl: timedelta):
|
||||
return super().create_key(
|
||||
uid=uid, name=name, expiration=(datetime.now() + ttl).isoformat()
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def set_user_status(self, uid: int, status):
|
||||
self._conn.execute(
|
||||
"UPDATE users SET verified_at = datetime('now') WHERE id = ?1", [uid]
|
||||
)
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def fetch_user_status(self, user_status_id: int):
|
||||
"""Fetch a user status by ID"""
|
||||
|
||||
return self._conn.execute(
|
||||
"SELECT id, name FROM user_statuses WHERE id = ?", [user_status_id]
|
||||
).fetchone()
|
||||
|
||||
################################################################################
|
||||
# Sessions / 'keys'
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def _create_session(
|
||||
self, uid: int, ttl: Optional[timedelta] = None, name: Optional[str] = None
|
||||
):
|
||||
return self._conn.execute(
|
||||
"INSERT INTO user_keys (user_id, name, expiration) VALUES (?1, ?2, ?3) RETURNING (id)",
|
||||
[uid, name, (datetime.now() + ttl).isoformat() if ttl else None],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def try_login(self, username: str, password: str, ttl: timedelta) -> Optional[str]:
|
||||
def try_login(
|
||||
self, *, username: str, password: str, ttl: timedelta
|
||||
) -> Optional[str]:
|
||||
"""Given a username and an (unsecured) password, attempt to authenticate the named user.
|
||||
|
||||
If successful, return the ID of a new session/key for that user.
|
||||
|
@ -165,389 +138,36 @@ class Store(object):
|
|||
|
||||
digest = sha3_256()
|
||||
digest.update(password.encode("utf-8"))
|
||||
digest = digest.hexdigest()
|
||||
res = self._conn.execute(
|
||||
"SELECT id, status_id FROM users WHERE (name=?1 AND hash=?2) OR (email=?1 AND hash=?2) LIMIT 1",
|
||||
[username, digest],
|
||||
).fetchone()
|
||||
res = super().try_login(username=username, hash=digest.hexdigest())
|
||||
if not res:
|
||||
return
|
||||
|
||||
uid, status = res
|
||||
if status > 0:
|
||||
return self._create_session(uid, ttl, "web session")
|
||||
return self.create_key(uid=uid, name="web session", ttl=ttl)
|
||||
|
||||
else:
|
||||
_, status = self.fetch_user_status(status)
|
||||
raise LoginError(status)
|
||||
|
||||
@requires_conn
|
||||
def create_key(self, kid: str, ttl, name: Optional[str] = None) -> Optional[str]:
|
||||
"""Given an _existing_ login session, create a new key.
|
||||
def try_create_user(
|
||||
self, *, username: str, email: str, password: str, gid: int = 1, sid: int = -3
|
||||
):
|
||||
digest = sha3_256()
|
||||
digest.update(password.encode("utf-8"))
|
||||
return super().try_create_user(
|
||||
name=username,
|
||||
email=email,
|
||||
hash=digest.hexdigest(),
|
||||
gid=gid,
|
||||
sid=sid,
|
||||
)
|
||||
|
||||
This allows the user to create more or less permanent API keys associated with their identity.
|
||||
|
||||
"""
|
||||
|
||||
if uid := self.try_key(kid):
|
||||
return self._create_session(uid, ttl, name)
|
||||
|
||||
@requires_conn
|
||||
def list_keys(self, uid: int):
|
||||
return [
|
||||
(id, name, datetime.fromisoformat(exp) if exp else None)
|
||||
for id, name, exp in self._conn.execute(
|
||||
"SELECT id, name, expiration FROM user_keys WHERE user_id = ?1 AND name != 'web session'",
|
||||
[uid],
|
||||
).fetchall()
|
||||
]
|
||||
|
||||
@requires_conn
|
||||
def fetch_key(self, kid) -> tuple:
|
||||
return self._conn.execute(
|
||||
"SELECT * FROM user_keys WHERE id = ?", [kid]
|
||||
).fetchone()
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def try_key(self, kid: str):
|
||||
"""Try to find the mapped user for a session."""
|
||||
|
||||
return self._conn.execute(
|
||||
"SELECT user_id FROM user_keys WHERE expiration IS NULL OR unixepoch(expiration) > unixepoch('now') and id = ?",
|
||||
[kid],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def refresh_key(self, kid: str, ttl: timedelta):
|
||||
def refresh_key(self, *, kid: str, ttl: timedelta):
|
||||
"""Automagically renew an API key which is still in use.
|
||||
|
||||
Mostly intended for dealing with web sessions which should implicitly extend, but which use the same mechanism as API keys.
|
||||
|
||||
"""
|
||||
|
||||
self._conn.execute(
|
||||
"UPDATE user_keys SET expiration = ? WHERE id = ?",
|
||||
[(datetime.now() + ttl).isoformat(), kid],
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def delete_key(self, uid: int, kid: str):
|
||||
"""Remove a session/key; equivalent to logout."""
|
||||
|
||||
self._conn.execute(
|
||||
"DELETE FROM user_keys WHERE user_id = ?1 and id = ?2", [uid, kid]
|
||||
)
|
||||
|
||||
################################################################################
|
||||
# Printers
|
||||
#
|
||||
# Printers represent connections to OctoPrint instances controlling physical machines.
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def try_create_printer(self, name: str, url: str, api_key: str):
|
||||
self._conn.execute(
|
||||
"INSERT INTO printers (name, url, api_key, status_id) VALUES (?1, ?2, ?3, 0) RETURNING id",
|
||||
[name, url, api_key],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def fetch_printer(self, printer_id: int):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
p.id
|
||||
, p.name
|
||||
, p.url
|
||||
, p.api_key
|
||||
, p.last_poll_date
|
||||
, s.name as status
|
||||
FROM printers p
|
||||
INNER JOIN printer_statuses s
|
||||
ON p.status_id = s.id
|
||||
WHERE p.id = ?1
|
||||
""",
|
||||
[printer_id],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def list_printers(self):
|
||||
return self._conn.execute(
|
||||
"SELECT p.id, p.name, p.url, p.api_key, p.last_poll_date, s.name as status FROM printers p INNER JOIN printer_statuses s ON p.status_id = s.id"
|
||||
).fetchall()
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def list_idle_printers(self):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT p.id
|
||||
FROM printers p
|
||||
LEFT JOIN (SELECT id, printer_id FROM jobs WHERE finished_at IS NULL) j ON p.id = j.printer_id
|
||||
INNER JOIN printer_statuses s ON p.status_id = s.id
|
||||
WHERE j.id IS NULL AND s.name = 'idle'
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def update_printer_status(self, printer_id, state: str):
|
||||
self._conn.execute(
|
||||
"""
|
||||
UPDATE printers
|
||||
SET
|
||||
status_id = (SELECT id FROM printer_statuses WHERE name = ?2)
|
||||
, last_poll_date = datetime('now')
|
||||
WHERE
|
||||
id = ?1
|
||||
""",
|
||||
[printer_id, state],
|
||||
)
|
||||
|
||||
################################################################################
|
||||
# Files
|
||||
#
|
||||
# A record of local files on disk, and the users who own then.
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def create_file(self, uid: int, name: str, path: str) -> int:
|
||||
return self._conn.execute(
|
||||
"INSERT INTO files (user_id, filename, path, upload_date) VALUES (?, ?, ?, datetime('now')) RETURNING (id)",
|
||||
[uid, name, path],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def list_files(self, uid: int):
|
||||
return self._conn.execute(
|
||||
"SELECT * FROM files WHERE user_id = ?", [uid]
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def delete_file(self, uid: int, fid: int):
|
||||
self._conn.execute("DELETE FROM files WHERE user_id = ? AND id = ?", [uid, fid])
|
||||
|
||||
@requires_conn
|
||||
def fetch_file(self, uid: int, fid: int):
|
||||
return self._conn.execute(
|
||||
"SELECT * FROM files WHERE user_id = ?1 AND id = ?2", [uid, fid]
|
||||
).fetchone()
|
||||
|
||||
################################################################################
|
||||
# Job
|
||||
#
|
||||
# A request by a user for a given file to be printed.
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def create_job(self, uid: int, fid: int, relative_priority: int = 0):
|
||||
"""Create a job mapped to a user with a file to print and a priority.
|
||||
|
||||
Note that the user may provide a sub-priority within their group queue. This allows users to create jobs with
|
||||
higher priority than existing jobs as a means of controlling the queue order.
|
||||
|
||||
May want a different (eg. more explicit) queue ordering mechanism here. Emulating the Netflix queue behavior of
|
||||
being able to drag some jobs ahead of others? How to model that?
|
||||
|
||||
"""
|
||||
assert 0 <= relative_priority <= 9
|
||||
return self._conn.execute(
|
||||
"""
|
||||
INSERT INTO jobs (
|
||||
user_id
|
||||
, file_id
|
||||
, priority
|
||||
) VALUES (
|
||||
?1
|
||||
, ?2
|
||||
, (
|
||||
SELECT priority + ?3
|
||||
FROM users
|
||||
WHERE uid = ?1
|
||||
)
|
||||
) RETURNING (id)
|
||||
""",
|
||||
[uid, fid, relative_priority],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def list_jobs(self, uid: Optional[int] = None):
|
||||
"""Enumerate jobs in priority order."""
|
||||
cond = f"user_id = {uid}" if uid else "TRUE"
|
||||
return self._conn.execute(
|
||||
f"""
|
||||
SELECT * FROM jobs
|
||||
WHERE {cond}
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_jobs_by_file(self, fid: int):
|
||||
return self._conn.execute(
|
||||
f"""
|
||||
SELECT * FROM jobs
|
||||
WHERE file_id = ?1
|
||||
""",
|
||||
[fid],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_job_queue(self, uid: Optional[int] = None):
|
||||
"""Enumerate jobs in priority order. Note: ignores completed jobs."""
|
||||
cond = f"user_id = {uid}" if uid else "TRUE"
|
||||
return self._conn.execute(
|
||||
f"""
|
||||
SELECT * FROM jobs
|
||||
WHERE finished_at IS NULL AND {cond}
|
||||
ORDER BY priority DESC
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_job_history(self, uid: Optional[int] = None):
|
||||
"""Enumerate jobs in priority order. Note: ignores completed jobs."""
|
||||
cond = f"user_id = {uid}" if uid else "TRUE"
|
||||
return self._conn.execute(
|
||||
f"""
|
||||
SELECT * FROM jobs
|
||||
WHERE finished_at IS NOT NULL AND {cond}
|
||||
ORDER BY datetime(finished_at) DESC
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_mapped_jobs(self):
|
||||
"""Scheduler detail. List mapped but not started jobs."""
|
||||
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT * FROM jobs
|
||||
WHERE started_at IS NULL AND printer_id IS NOT NULL
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_running_jobs(self):
|
||||
"""Scheduler detail. List running jobs.
|
||||
|
||||
Note that jobs for which cancellation has been requested but which HAVE
|
||||
NOT YET BEEN ACKNOWLEDGED AS CANCELLED BY OctoPrint must still be
|
||||
"running". This prevents the cancelling printer from being rescheduled
|
||||
prematurely and from allows the job to be moved into the cancelled state
|
||||
by normal printer status inspection.
|
||||
|
||||
"""
|
||||
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT * FROM jobs
|
||||
WHERE started_at IS NOT NULL
|
||||
AND printer_id IS NOT NULL
|
||||
AND finished_at IS NULL
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_cancelled_jobs(self):
|
||||
"""Scheduler detail. List jobs which have been cancelled but are still 'live' (not finished)."""
|
||||
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT * FROM jobs
|
||||
WHERE finished_at IS NULL
|
||||
AND cancelled_at IS NOT NULL
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@fmap(one)
|
||||
@requires_conn
|
||||
def poll_job_queue(self):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT id
|
||||
FROM jobs
|
||||
WHERE started_at IS NULL
|
||||
AND finished_at IS NULL
|
||||
AND printer_id IS NULL
|
||||
ORDER BY priority DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def fetch_job(self, uid: int, jid: int) -> Optional[tuple]:
|
||||
return self._conn.execute(
|
||||
"SELECT * FROM jobs WHERE user_id = ? AND id = ?", [uid, jid]
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def fetch_job_by_printer(self, pid: int) -> Optional[tuple]:
|
||||
"""Find 'the' mapped incomplete job for a given printer."""
|
||||
|
||||
return self._conn.execute(
|
||||
"SELECT * FROM jobs WHERE printer_id = ? AND finished_at IS NULL", [pid]
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def assign_job(self, job_id: int, printer_id: int):
|
||||
return self._conn.execute(
|
||||
"UPDATE jobs SET printer_id = ?2 WHERE id = ?1", [job_id, printer_id]
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def cancel_job(self, uid: int, job_id: int):
|
||||
return self._conn.execute(
|
||||
"UPDATE jobs SET cancelled_at = datetime('now') WHERE user_id = ?1 AND id = ?2",
|
||||
[uid, job_id],
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def start_job(self, job_id: int):
|
||||
return self._conn.execute(
|
||||
"UPDATE jobs SET started_at = datetime('now') WHERE id = ?1", [job_id]
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def finish_job(self, job_id: int, state: str, message: str = None):
|
||||
return self._conn.execute(
|
||||
"UPDATE jobs SET finished_at = datetime('now'), state = ?2, message = ?3 WHERE id = ?1",
|
||||
[job_id, state, message],
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def delete_job(self, uid: int, jid: int):
|
||||
self._conn.execute("DELETE FROM jobs WHERE user_id = ? and id = ?", [uid, jid])
|
||||
|
||||
################################################################################
|
||||
# Emails (notifications)
|
||||
|
||||
@requires_conn
|
||||
def create_email(self, uid: int, subject: str, body: str):
|
||||
return self._conn.execute(
|
||||
"INSERT INTO email_spool (user_id, subject, body) VALUES (?1, ?2, ?3) RETURNING id",
|
||||
[uid, subject, body],
|
||||
).fetchone()
|
||||
|
||||
@requires_conn
|
||||
def send_email(self, eid: int):
|
||||
return self._conn.execute(
|
||||
"UPDATE email_spool SET sent_at = datetime('now') WHERE id = ?1", [eid]
|
||||
)
|
||||
|
||||
@requires_conn
|
||||
def poll_spool(self, limit: int = 16):
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT s.id as `id`, u.email as `to`, subject, body
|
||||
FROM email_spool s
|
||||
INNER JOIN users u
|
||||
ON s.user_id = u.id
|
||||
WHERE s.sent_at IS NULL
|
||||
LIMIT ?1
|
||||
""",
|
||||
[limit],
|
||||
).fetchall()
|
||||
super().refresh_key(kid=kid, expiration=(datetime.now() + ttl).isoformat())
|
||||
|
|
|
@ -25,14 +25,14 @@ def password_testy():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def uid_testy(store, username_testy, password_testy):
|
||||
uid, status = store.try_create_user(
|
||||
username_testy,
|
||||
username_testy,
|
||||
password_testy,
|
||||
status_id=1,
|
||||
)
|
||||
return uid
|
||||
def uid_testy(store: s.Store, username_testy, password_testy):
|
||||
with store.savepoint():
|
||||
return store.try_create_user(
|
||||
username=username_testy,
|
||||
email=username_testy,
|
||||
password=password_testy,
|
||||
sid=1,
|
||||
).id
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -42,4 +42,7 @@ def login_ttl():
|
|||
|
||||
@pytest.fixture
|
||||
def sid_testy(store, uid_testy, username_testy, password_testy, login_ttl):
|
||||
return store.try_login(username_testy, password_testy, login_ttl)
|
||||
with store.savepoint():
|
||||
return store.try_login(
|
||||
username=username_testy, password=password_testy, ttl=login_ttl
|
||||
).id
|
||||
|
|
|
@ -7,32 +7,50 @@ def test_store_initializes(store: Store):
|
|||
assert isinstance(store, Store)
|
||||
|
||||
|
||||
def test_mkuser(store: Store, uid_testy, username_testy):
|
||||
assert store.list_users() == [(uid_testy, username_testy)]
|
||||
def test_store_savepoint(store: Store):
|
||||
obj = store.savepoint()
|
||||
|
||||
assert hasattr(obj, "__enter__")
|
||||
assert hasattr(obj, "__exit__")
|
||||
|
||||
flag = False
|
||||
with obj:
|
||||
flag = True
|
||||
assert flag
|
||||
|
||||
|
||||
def test_mkuser(store: Store, username_testy, password_testy):
|
||||
res = store.try_create_user(
|
||||
username=username_testy, email=username_testy, password=password_testy
|
||||
)
|
||||
assert res
|
||||
assert store.list_users() == [(res.id, username_testy)]
|
||||
|
||||
|
||||
def test_mksession(store: Store, uid_testy, username_testy, password_testy, login_ttl):
|
||||
sid = store.try_login(username_testy, password_testy, login_ttl)
|
||||
assert sid is not None
|
||||
assert store.list_keys() == [(sid, uid_testy)]
|
||||
assert store.try_key(sid) == uid_testy
|
||||
res = store.try_login(
|
||||
username=username_testy, password=password_testy, ttl=login_ttl
|
||||
)
|
||||
assert res is not None
|
||||
assert [it.id for it in store.list_keys(uid=uid_testy)] == [res.id]
|
||||
assert store.try_key(kid=res.id).user_id == uid_testy
|
||||
|
||||
|
||||
def test_refresh_key(store: Store, sid_testy, login_ttl):
|
||||
before = store.fetch_key(sid_testy)
|
||||
store.refresh_key(sid_testy, login_ttl * 2)
|
||||
after = store.fetch_key(sid_testy)
|
||||
before = store.fetch_key(kid=sid_testy)
|
||||
store.refresh_key(kid=sid_testy, ttl=login_ttl * 2)
|
||||
after = store.fetch_key(kid=sid_testy)
|
||||
assert before != after
|
||||
|
||||
|
||||
def tets_mkkey(store: Store, sid_testy, uid_testy):
|
||||
assert store.try_key(sid_testy) == uid_testy
|
||||
new_key = store.create_key(sid_testy, None)
|
||||
assert store.try_key(kid=sid_testy) == uid_testy
|
||||
new_key = store.create_key(kid=sid_testy, ttl=None)
|
||||
assert new_key is not None
|
||||
assert store.try_key(new_key) == uid_testy
|
||||
assert store.try_key(kid=new_key) == uid_testy
|
||||
|
||||
|
||||
def test_logout(store: Store, sid_testy):
|
||||
assert store.try_key(sid_testy)
|
||||
store.delete_key(sid_testy)
|
||||
assert not store.try_key(sid_testy)
|
||||
def test_logout(store: Store, uid_testy, sid_testy):
|
||||
assert store.try_key(kid=sid_testy)
|
||||
store.delete_key(uid=uid_testy, kid=sid_testy)
|
||||
assert not store.try_key(kid=sid_testy)
|
||||
|
|
Loading…
Reference in a new issue