A whoooooole lot of refactoring

This commit is contained in:
Reid 'arrdem' McKenzie 2023-06-03 15:09:50 -06:00
parent bda009482b
commit 48c30fdcf8
14 changed files with 215 additions and 167 deletions

View file

@ -62,18 +62,18 @@ load("@arrdem_source_pypi//:requirements.bzl", "install_deps")
# Call it to define repos for your requirements.
install_deps()
git_repository(
name = "rules_zapp",
remote = "https://git.arrdem.com/arrdem/rules_zapp.git",
commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b",
# tag = "0.1.2",
)
# local_repository(
# git_repository(
# name = "rules_zapp",
# path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp",
# remote = "https://git.arrdem.com/arrdem/rules_zapp.git",
# commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b",
# # tag = "0.1.2",
# )
local_repository(
name = "rules_zapp",
path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp",
)
####################################################################################################
# Docker support
####################################################################################################

View file

@ -17,9 +17,10 @@ from tentacles.blueprints import (
printer_ui,
user_ui,
)
from tentacles.db import Db
from tentacles.globals import _ctx, Ctx, ctx
from tentacles.store import Store
from tentacles.workers import Worker
from tentacles.workers import *
from tentacles.workers import assign_jobs, Worker
@click.group()
@ -28,7 +29,7 @@ def cli():
def db_factory(app):
store = Store(app.config.get("db", {}).get("uri"))
store = Db(app.config.get("db", {}).get("uri"))
store.connect()
return store
@ -38,10 +39,11 @@ def custom_ctx(app, wsgi_app):
store = db_factory(app)
token = _ctx.set(Ctx(store))
try:
with store.savepoint():
return wsgi_app(environ, start_response)
finally:
_ctx.reset(token)
store.close()
_ctx.reset(token)
return helper
@ -56,21 +58,21 @@ def user_session():
if (
(
(session_id := request.cookies.get("sid", ""))
and (uid := ctx.db.try_key(session_id))
and (row := ctx.db.try_key(kid=session_id))
)
or (
request.authorization
and request.authorization.token
and (uid := ctx.db.try_key(request.authorization.token))
and (row := ctx.db.try_key(kid=request.authorization.token))
)
or (
(api_key := request.headers.get("x-api-key"))
and (uid := ctx.db.try_key(api_key))
and (row := ctx.db.try_key(kid=api_key))
)
):
ctx.sid = session_id
ctx.uid = uid
user = ctx.db.fetch_user(uid)
ctx.sid = row.id
ctx.uid = row.user_id
user = ctx.db.fetch_user(row.user_id)
ctx.gid = user.group_id
ctx.username = user.name
ctx.is_admin = user.group_id == 0
@ -104,11 +106,12 @@ def make_app():
@click.option("--config", type=Path)
def serve(hostname: str, port: int, config: Path):
logging.basicConfig(
format="%(asctime)s %(relativeCreated)6d %(threadName)s - %(name)s - %(levelname)s - %(message)s",
format="%(asctime)s %(threadName)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logging.getLogger("tentacles").setLevel(logging.DEBUG)
logging.getLogger("tentacles.db").setLevel(logging.DEBUG - 1)
app = make_app()
@ -133,8 +136,14 @@ def serve(hostname: str, port: int, config: Path):
server.shutdown_timeout = 1
server.subscribe()
# Spawn the worker thread
Worker(cherrypy.engine, app, db_factory, frequency=5).start()
# Spawn the worker thread(s)
Worker(cherrypy.engine, app, db_factory, poll_printers, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, assign_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, push_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, revoke_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, pull_jobs, frequency=5).start()
Worker(cherrypy.engine, app, db_factory, send_emails, frequency=5).start()
# Run the server
cherrypy.engine.start()

View file

@ -98,10 +98,12 @@ def create_file(location: Optional[str] = None):
return {"error": "file exists already"}, 409
file.save(sanitized_path)
fid = ctx.db.create_file(ctx.uid, file.filename, sanitized_path)
fid = ctx.db.create_file(
uid=ctx.uid, filename=file.filename, path=sanitized_path
)
if request.form.get("print", "").lower() == "true":
ctx.db.create_job(ctx.uid, fid)
ctx.db.create_job(uid=ctx.uid, fid=fid)
return {"status": "ok"}, 202
@ -119,7 +121,7 @@ def get_files():
"owner": ctx.uid,
"upload_date": f.upload_date,
}
for f in ctx.db.list_files(ctx.uid)
for f in ctx.db.list_files(uid=ctx.uid)
]
}, 200
@ -150,7 +152,7 @@ def get_jobs():
"finished_at": j.finished_at,
"printer_id": j.printer_id,
}
for j in ctx.db.list_jobs()
for j in ctx.db.list_jobs(uid=ctx.uid)
]
}, 200

View file

@ -42,9 +42,10 @@ def manipulate_files():
return render_template("files.html.j2"), code
case "delete":
file = ctx.db.fetch_file(ctx.uid, int(request.form.get("file_id")))
file = ctx.db.fetch_file(uid=ctx.uid, fid=int(request.form.get("file_id")))
if any(
job.finished_at is None for job in ctx.db.list_jobs_by_file(file.id)
job.finished_at is None
for job in ctx.db.list_jobs_by_file(uid=ctx.uid, fid=file.id)
):
flash("File is in use", category="error")
return render_template("files.html.j2"), 400
@ -52,7 +53,7 @@ def manipulate_files():
if os.path.exists(file.path):
os.unlink(file.path)
ctx.db.delete_file(ctx.uid, file.id)
ctx.db.delete_file(uid=ctx.uid, fid=file.id)
flash("File deleted", category="info")
case _:

View file

@ -29,22 +29,24 @@ def list_jobs():
def manipulate_jobs():
match request.form.get("action"):
case "enqueue":
ctx.db.create_job(ctx.uid, int(request.form.get("file_id")))
ctx.db.create_job(uid=ctx.uid, fid=int(request.form.get("file_id")))
flash("Job created!", category="info")
case "duplicate":
if job := ctx.db.fetch_job(ctx.uid, int(request.form.get("job_id"))):
ctx.db.create_job(ctx.uid, job.file_id)
if job := ctx.db.fetch_job(
uid=ctx.uid, jid=int(request.form.get("job_id"))
):
ctx.db.create_job(uid=ctx.uid, fid=job.file_id)
flash("Job created!", category="info")
else:
flash("Could not duplicate", category="error")
case "cancel":
ctx.db.cancel_job(ctx.uid, int(request.form.get("job_id")))
ctx.db.cancel_job(uid=ctx.uid, jid=int(request.form.get("job_id")))
flash("Cancellation reqested", category="info")
case "delete":
ctx.db.delete_job(ctx.uid, int(request.form.get("job_id")))
ctx.db.delete_job(uid=ctx.uid, jid=int(request.form.get("job_id")))
flash("Job deleted", category="info")
case _:

View file

@ -36,9 +36,10 @@ def add_printer():
assert request.form["url"]
assert request.form["api_key"]
ctx.db.try_create_printer(
request.form["name"],
request.form["url"],
request.form["api_key"],
name=request.form["name"],
url=request.form["url"],
api_key=request.form["api_key"],
sid=0, # Disconnected
)
flash("Printer created")
return redirect("/printers")

View file

@ -42,13 +42,13 @@ def get_login():
@BLUEPRINT.route("/user/login", methods=["POST"])
def post_login():
if sid := ctx.db.try_login(
username := request.form["username"],
salt(request.form["password"]),
timedelta(days=1),
if row := ctx.db.try_login(
username=(username := request.form["username"]),
password=salt(request.form["password"]),
ttl=timedelta(days=1),
):
resp = redirect("/")
resp.set_cookie("sid", sid)
resp.set_cookie("sid", row.id)
flash(f"Welcome, {username}", category="success")
return resp
@ -72,7 +72,7 @@ def post_register():
username = request.form["username"]
email = request.form["email"]
group_id = 1 # Normal users
status_id = -2 # Unverified
status_id = -3 # Unverified
for user_config in current_app.config.get("users", []):
if user_config["email"] == email:
@ -85,13 +85,17 @@ def post_register():
break
if user := ctx.db.try_create_user(
username, email, salt(request.form["password"]), group_id, status_id
username=username,
email=email,
password=salt(request.form["password"]),
gid=group_id,
sid=status_id,
):
if user.status_id == -2:
if user.status_id == -3:
ctx.db.create_email(
user.id,
"Tentacles email confirmation",
render_template(
uid=user.id,
subject="Tentacles email confirmation",
body=render_template(
"verification_email.html.j2",
username=user.name,
token_id=user.verification_token,
@ -103,6 +107,10 @@ def post_register():
"Please check your email for a verification request",
category="success",
)
elif user.status_id == 1:
flash("Welcome, please log in", category="success")
return render_template("register.html.j2")
except Exception as e:
@ -115,7 +123,7 @@ def post_register():
@BLUEPRINT.route("/user/logout")
def logout():
# Invalidate the user's authorization
ctx.db.delete_key(ctx.uid, ctx.sid)
ctx.db.delete_key(uid=ctx.uid, kid=ctx.sid)
resp = redirect("/")
resp.set_cookie("sid", "")
return resp
@ -132,7 +140,7 @@ def get_settings():
@BLUEPRINT.route("/user", methods=["POST"])
def post_settings():
if request.form["action"] == "add":
ttl_spec = request.form.get("ttl")
ttl_spec = request.form.get("ttl", "")
if ttl_spec == "forever":
ttl = None
elif m := re.fullmatch(r"(\d+)d", ttl_spec):
@ -141,7 +149,9 @@ def post_settings():
flash("Bad request", category="error")
return render_template("user.html.j2"), 400
ctx.db.create_key(ctx.sid, ttl, request.form.get("name"))
ctx.db.create_key(
uid=ctx.uid, ttl=ttl, name=request.form.get("name", "anonymous")
)
flash("Key created", category="success")
elif request.form["action"] == "revoke":

View file

@ -8,6 +8,7 @@ from importlib.resources import files
from inspect import signature
import logging
import sqlite3
from time import sleep
from types import GeneratorType, new_class
from typing import Optional
@ -27,7 +28,7 @@ def qfn(name, f):
# Force lazy values for convenience
if isinstance(res, GeneratorType):
res = list(res)
print("%s -> %r" % (name, res))
log.log(logging.DEBUG - 1, "%s (%r) -> %r", name, kwargs, res)
return res
_helper.__name__ = f.__name__
@ -64,7 +65,7 @@ class LoginError(StoreError):
pass
class Store(Queries):
class Db(Queries):
def __init__(self, path):
self._path = path
self._conn: sqlite3.Connection = None
@ -106,9 +107,24 @@ class Store(Queries):
try:
self.begin()
yield self
exc = None
for attempt in range(5):
try:
self.commit()
break
except sqlite3.OperationalError as e:
exc = e
if e.sqlite_errorcode == 6:
sleep(0.1 * attempt)
continue
else:
raise e
else:
raise exc
except sqlite3.Error:
self.rollback()
log.exception("Forced to roll back!")
return _helper()
@ -122,9 +138,11 @@ class Store(Queries):
################################################################################
# Wrappers for doing Python type mapping
def create_key(self, *, uid: int, name: str, ttl: timedelta):
def create_key(self, *, uid: int, name: str, ttl: Optional[timedelta]):
return super().create_key(
uid=uid, name=name, expiration=(datetime.now() + ttl).isoformat()
uid=uid,
name=name,
expiration=((datetime.now() + ttl).isoformat() if ttl else None),
)
def try_login(
@ -171,3 +189,6 @@ class Store(Queries):
"""
super().refresh_key(kid=kid, expiration=(datetime.now() + ttl).isoformat())
def finish_job(self, *, jid: int, state: str, message: Optional[str] = None):
super().finish_job(jid=jid, state=state, message=message)

View file

@ -3,13 +3,13 @@
from contextvars import ContextVar
from attrs import define
from tentacles.store import Store
from tentacles.db import Db
from werkzeug.local import LocalProxy
@define
class Ctx:
db: Store
db: Db
uid: int = None
gid: int = None
sid: str = None

View file

@ -93,7 +93,6 @@ CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT
, user_id INTEGER NOT NULL
, file_id INTEGER NOT NULL
, priority INTEGER CHECK(priority IS NOT NULL AND 0 <= priority)
, started_at TEXT
, cancelled_at TEXT
, finished_at TEXT
@ -199,6 +198,7 @@ INSERT INTO user_keys (
VALUES (:uid, :name, :expiration)
RETURNING
id
, user_id
;
-- name: try-login^
@ -232,7 +232,8 @@ WHERE
-- name: try-key^
SELECT
user_id
id
, user_id
FROM user_keys
WHERE
(expiration IS NULL OR unixepoch(expiration) > unixepoch('now'))
@ -265,7 +266,7 @@ INSERT INTO printers (
, api_key
, status_id
)
VALUES (:name, :url, :api_key, :status_id)
VALUES (:name, :url, :api_key, :sid)
RETURNING
id
;
@ -310,10 +311,10 @@ WHERE
-- name: update-printer-status!
UPDATE printers
SET
status_id = (SELECT id FROM printer_statuses WHERE name = :status)
status_id = (SELECT id FROM printer_statuses WHERE name = :status or id = :status)
, last_poll_date = datetime('now')
WHERE
id = :uid
id = :pid
;
----------------------------------------------------------------------------------------------------
@ -364,16 +365,10 @@ WHERE
INSERT INTO jobs (
user_id
, file_id
, priority
)
VALUES (
:uid
, :fid,
, (
SELECT priority + :priority
FROM users
WHERE uid = :uid
)
, :fid
)
RETURNING
id
@ -384,7 +379,7 @@ SELECT
FROM jobs
WHERE
user_id = :uid
AND id = :fid
AND id = :jid
;
-- name: list-jobs
@ -401,6 +396,7 @@ SELECT
FROM jobs
WHERE
file_id = :fid
, uid = :uid
;
-- name: list-job-queue
@ -410,11 +406,9 @@ FROM jobs
WHERE
finished_at IS NULL
AND (:uid IS NULL OR user_id = :uid)
ORDER BY
priority DESC
;
-- name: poll-job-queue
-- name: poll-job-queue^
SELECT
*
FROM jobs
@ -422,8 +416,6 @@ WHERE
started_at IS NULL
AND finished_at IS NULL
AND printer_id IS NULL
ORDER BY
priority DESC
LIMIT 1
;

View file

@ -35,7 +35,7 @@
{% macro job_state(job) %}
{{ 'queued' if (not job.finished_at and not job.printer_id and not job.cancelled_at) else
'running' if (not job.finished_at and job.printer_id and not job.cancelled_at) else
'cancelling' if (not job.finished_at and job.printer_id and job.cancelled_at) else
'cancelling' if (not job.finished_at and job.cancelled_at) else
job.state }}
{% endmacro %}

View file

@ -8,11 +8,10 @@ Mostly related to monitoring and managing Printer state.
"""
from contextlib import closing
from datetime import datetime, timedelta
import logging
from pathlib import Path
from threading import Event
from time import sleep
from typing import Callable
from urllib import parse as urlparse
from cherrypy.process.plugins import Monitor
@ -25,7 +24,7 @@ from requests.exceptions import (
HTTPError,
Timeout,
)
from tentacles.store import Store
from tentacles.db import Db
class OctoRest(_OR):
@ -46,36 +45,16 @@ log = logging.getLogger(__name__)
SHUTDOWN = Event()
def corn_job(every: timedelta):
def _decorator(f):
def _helper(*args, **kwargs):
last = None
while not SHUTDOWN.is_set():
if not last or (datetime.now() - last) > every:
log.debug(f"Ticking job {f.__name__}")
try:
last = datetime.now()
f(*args, **kwargs)
except Exception:
log.exception(f"Error while procesing task {f.__name__}")
else:
sleep(1)
return _helper
return _decorator
def poll_printers(app: App, store: Store) -> None:
def poll_printers(app: App, db: Db) -> None:
"""Poll printers for their status."""
for printer in store.list_printers():
mapped_job = store.fetch_job_by_printer(printer.id)
for printer in db.list_printers():
mapped_job = db.fetch_job_by_printer(pid=printer.id)
def _set_status(status: str):
if printer.status != status:
print(f"Printer {printer.id} {printer.status} -> {status}")
store.update_printer_status(printer.id, status)
log.info(f"Printer {printer.id} {printer.status} -> {status}")
db.update_printer_status(pid=printer.id, status=status)
try:
client = OctoRest(url=printer.url, apikey=printer.api_key)
@ -91,7 +70,7 @@ def poll_printers(app: App, store: Store) -> None:
# polling tasks. This violates separation of concerns a bit,
# but appears required for correctness.
if mapped_job:
store.finish_job(mapped_job.id, "error")
db.finish_job(jid=mapped_job.id, state="error")
_set_status("error")
@ -129,24 +108,24 @@ def poll_printers(app: App, store: Store) -> None:
)
def assign_jobs(app: App, store: Store) -> None:
def assign_jobs(app: App, db: Db) -> None:
"""Assign jobs to printers. Uploading files and job state management is handled separately."""
for printer_id in store.list_idle_printers():
if job_id := store.poll_job_queue():
store.assign_job(job_id, printer_id)
print(f"Mapped job {job_id} to printer {printer_id}")
for printer in db.list_idle_printers():
if job := db.poll_job_queue():
db.assign_job(jid=job.id, pid=printer.id)
log.info(f"Mapped job {job.id} to printer {printer.id}")
def push_jobs(app: App, store: Store) -> None:
def push_jobs(app: App, db: Db) -> None:
"""Ensure that job files are uploaded and started to the assigned printer."""
for job in store.list_mapped_jobs():
printer = store.fetch_printer(job.printer_id)
file = store.fetch_file(job.user_id, job.file_id)
for job in db.list_mapped_jobs():
printer = db.fetch_printer(pid=job.printer_id)
file = db.fetch_file(uid=job.user_id, fid=job.file_id)
if not file:
log.error(f"Job {job.id} no longer maps to a file")
store.delete_job(job.user_id, job.id)
db.delete_job(job.user_id, job.id)
try:
client = OctoRest(url=printer.url, apikey=printer.api_key)
@ -157,11 +136,15 @@ def push_jobs(app: App, store: Store) -> None:
printer_state = {"error": printer_job.get("error")}
if printer_state.get("error"):
print(f"Printer {printer.id} is in error, can't push")
log.warn(f"Printer {printer.id} is in error, can't push")
continue
try:
if not client.files_info("local", Path(file.path).name):
client.upload(file.path)
else:
log.info("Don't need to upload the job!")
except HTTPError as e:
if e.response.status_code == 409:
pass
@ -170,7 +153,7 @@ def push_jobs(app: App, store: Store) -> None:
client.select(Path(file.path).name)
client.start()
store.start_job(job.id)
db.start_job(job.id)
except TimeoutError:
pass
@ -178,19 +161,19 @@ def push_jobs(app: App, store: Store) -> None:
log.exception("Oop")
def revoke_jobs(app: App, store: Store) -> None:
def revoke_jobs(app: App, db: Db) -> None:
"""Ensure that job files are uploaded and started to the assigned printer.
Note that this will ALSO cancel jobs out of the print queue.
"""
for job in store.list_cancelled_jobs():
for job in db.list_canceling_jobs():
if job.printer_id:
printer = store.fetch_printer(job.printer_id)
printer = db.fetch_printer(pid=job.printer_id)
try:
print(f"Cancelling running job {job.id}")
log.info(f"Cancelling running job {job.id}")
client = OctoRest(url=printer.url, apikey=printer.api_key)
try:
client.cancel()
@ -200,8 +183,8 @@ def revoke_jobs(app: App, store: Store) -> None:
else:
raise
print(f"Job {job.id} -> cancelled")
store.finish_job(job.id, "cancelled")
log.info(f"Job {job.id} -> cancelled")
db.finish_job(jid=job.id, state="cancelled")
except TimeoutError:
pass
@ -210,15 +193,15 @@ def revoke_jobs(app: App, store: Store) -> None:
log.exception("Oop")
else:
print(f"Unmapped job {job.id} became cancelled")
store.finish_job(job.id, "cancelled")
log.info(f"Unmapped job {job.id} became cancelled")
db.finish_job(jid=job.id, state="cancelled")
def pull_jobs(app: App, store: Store) -> None:
def pull_jobs(app: App, db: Db) -> None:
"""Poll the state of mapped printers to control jobs."""
for job in store.list_running_jobs():
printer = store.fetch_printer(job.printer_id)
for job in db.list_running_jobs():
printer = db.fetch_printer(pid=job.printer_id)
try:
client = OctoRest(url=printer.url, apikey=printer.api_key)
job_state = client.job_info()
@ -231,19 +214,19 @@ def pull_jobs(app: App, store: Store) -> None:
pass
elif job_state.get("progress", {}).get("completion", 0.0) == 100.0:
print(f"Job {job.id} has succeeded")
store.finish_job(job.id, "success")
log.info(f"Job {job.id} has succeeded")
db.finish_job(jid=job.id, state="success")
elif printer_state.get("error"):
print(f"Job {job.id} has failed")
store.finish_job(job.id, "failed")
log.warn(f"Job {job.id} has failed")
db.finish_job(jid=job.id, state="failed")
elif printer_state.get("cancelling"):
print(f"Job {job.id} has been acknowledged as cancelled")
store.finish_job(job.id, "cancelled")
log.info(f"Job {job.id} has been acknowledged as cancelled")
db.finish_job(jid=job.id, state="cancelled")
else:
print(
log.warn(
f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}"
)
@ -254,35 +237,60 @@ def pull_jobs(app: App, store: Store) -> None:
log.exception("Oop")
def send_emails(app, store: Store):
def send_emails(app, db: Db):
with closing(
FastMailSMTP(
app.config.get("fastmail", {}).get("username"),
app.config.get("fastmail", {}).get("key"),
)
) as fm:
for message in store.poll_spool():
for message in db.poll_email_queue():
fm.send_message(
from_addr="root@tirefireind.us",
to_addrs=[message.to],
subject=message.subject,
msg=message.body,
)
store.send_email(message.id)
db.send_email(message.id)
def once(f):
val = uninitialized = object()
def _helper(*args, **kwargs):
nonlocal val
if val is uninitialized:
val = f(*args, **kwargs)
return val
return _helper
def toil(*fs):
def _helper(*args, **kwargs):
for f in fs:
f(*args, **kwargs)
_helper.__name__ = "toil"
return _helper
class Worker(Monitor):
def __init__(self, bus, app, db_factory, **kwargs):
def __init__(
self,
bus,
app: App,
db_factory: Callable[[App], Db],
callback: Callable[[App, Db], None],
**kwargs,
):
self._app = app
self._db_factory = db_factory
super().__init__(bus, self.callback, **kwargs)
self._callback = callback
super().__init__(
bus, self.callback, **kwargs, name=f"Async {callback.__name__}"
)
def callback(self):
log.debug("Tick")
with self._app.app_context(), closing(self._db_factory(self._app)) as store:
poll_printers(self._app, store)
assign_jobs(self._app, store)
push_jobs(self._app, store)
revoke_jobs(self._app, store)
pull_jobs(self._app, store)
send_emails(self._app, store)
with closing(self._db_factory(self._app)) as db:
self._callback(self._app, db)

View file

@ -3,12 +3,12 @@
from datetime import timedelta
import pytest
import tentacles.store as s
from tentacles.db import Db
@pytest.yield_fixture
def store():
conn = s.Store(":memory:")
def db():
conn = Db(":memory:")
conn.connect()
yield conn
conn.close()
@ -25,9 +25,9 @@ def password_testy():
@pytest.fixture
def uid_testy(store: s.Store, username_testy, password_testy):
with store.savepoint():
return store.try_create_user(
def uid_testy(db: Db, username_testy, password_testy):
with db.savepoint():
return db.try_create_user(
username=username_testy,
email=username_testy,
password=password_testy,
@ -41,8 +41,10 @@ def login_ttl():
@pytest.fixture
def sid_testy(store, uid_testy, username_testy, password_testy, login_ttl):
with store.savepoint():
return store.try_login(
def sid_testy(db: Db, uid_testy, username_testy, password_testy, login_ttl):
with db.savepoint():
res = db.try_login(
username=username_testy, password=password_testy, ttl=login_ttl
).id
)
assert res.user_id == uid_testy
return res.id

View file

@ -1,13 +1,13 @@
#!/usr/bin/env python3
from tentacles.store import Store
from tentacles.db import Db
def test_store_initializes(store: Store):
assert isinstance(store, Store)
def test_db_initializes(store: Db):
assert isinstance(store, Db)
def test_store_savepoint(store: Store):
def test_db_savepoint(store: Db):
obj = store.savepoint()
assert hasattr(obj, "__enter__")