Fix tests, create gcode analysis machinery
This commit is contained in:
parent
19c941dc95
commit
4fa627919b
18 changed files with 327 additions and 78 deletions
20
WORKSPACE
20
WORKSPACE
|
@ -62,18 +62,18 @@ load("@arrdem_source_pypi//:requirements.bzl", "install_deps")
|
|||
# Call it to define repos for your requirements.
|
||||
install_deps()
|
||||
|
||||
# git_repository(
|
||||
# name = "rules_zapp",
|
||||
# remote = "https://git.arrdem.com/arrdem/rules_zapp.git",
|
||||
# commit = "72f82e0ace184fe862f1b19c4f71c3bc36cf335b",
|
||||
# # tag = "0.1.2",
|
||||
# )
|
||||
|
||||
local_repository(
|
||||
name = "rules_zapp",
|
||||
path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp",
|
||||
git_repository(
|
||||
name = "rules_zapp",
|
||||
remote = "https://git.arrdem.com/arrdem/rules_zapp.git",
|
||||
commit = "961be891e5cff539e14f2050d5cd9e82845ce0f2",
|
||||
# tag = "0.1.2",
|
||||
)
|
||||
|
||||
# local_repository(
|
||||
# name = "rules_zapp",
|
||||
# path = "/home/arrdem/Documents/hobby/programming/lang/python/rules_zapp",
|
||||
# )
|
||||
|
||||
####################################################################################################
|
||||
# Docker support
|
||||
####################################################################################################
|
||||
|
|
|
@ -8,6 +8,7 @@ py_project(
|
|||
zapp_binary(
|
||||
name = "qint",
|
||||
main = "src/python/proquint/__main__.py",
|
||||
shebang = "#!/usr/bin/env python3",
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
|
|
96
projects/tentacles/src/python/gcode.py
Normal file
96
projects/tentacles/src/python/gcode.py
Normal file
|
@ -0,0 +1,96 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from pathlib import Path
|
||||
from attrs import define
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import re
|
||||
|
||||
OPTION_PATTERN = re.compile("; (?P<key>[a-z0-9_]+) = (?P<value>.*?)\n")
|
||||
|
||||
|
||||
def parse_prusa_config_str(content: str) -> dict:
|
||||
kvs = {}
|
||||
iter = re.finditer(OPTION_PATTERN, content)
|
||||
while m := next(iter, None):
|
||||
if m.group("key") == "prusaslicer_config" and m.group("value") == "begin":
|
||||
break
|
||||
while m := next(iter, None):
|
||||
if m.group("key") == "prusaslicer_config" and m.group("value") == "end":
|
||||
break
|
||||
else:
|
||||
kvs[m.group("key")] = m.group("value")
|
||||
|
||||
return kvs
|
||||
|
||||
|
||||
def parse_prusa_config(p: Path):
|
||||
with open(p) as fp:
|
||||
return parse_prusa_config_str(fp.read())
|
||||
|
||||
|
||||
@define
|
||||
class GcodeAnalysis:
|
||||
max_x: int
|
||||
max_y: int
|
||||
max_z: int
|
||||
max_bed: int
|
||||
max_end: int
|
||||
filament: str
|
||||
|
||||
|
||||
def parse_point(point: str) -> Tuple[int, int]:
|
||||
a, b = point.split("x")
|
||||
return int(a), int(b)
|
||||
|
||||
|
||||
def parse_bed_shape(coords: str) -> Tuple[int, int]:
|
||||
# Note, these are clockwise from 0, 0
|
||||
a, b, c, d = coords.split(",")
|
||||
dx, _ = parse_point(a)
|
||||
_, dy = parse_point(b)
|
||||
x, y = parse_point(c)
|
||||
return x + dx, y + dy
|
||||
|
||||
|
||||
def analyze_gcode_str(text: str) -> Optional[GcodeAnalysis]:
|
||||
opts = parse_prusa_config_str(text)
|
||||
|
||||
kwargs = {}
|
||||
if "bed_shape" in opts:
|
||||
max_x, max_y = parse_bed_shape(opts["bed_shape"])
|
||||
kwargs["max_x"] = max_x
|
||||
kwargs["max_y"] = max_y
|
||||
else:
|
||||
return None
|
||||
|
||||
if "max_print_height" in opts:
|
||||
kwargs["max_z"] = int(opts["max_print_height"])
|
||||
else:
|
||||
return None
|
||||
|
||||
if "first_layer_bed_temperature" in opts:
|
||||
kwargs["max_bed"] = int(opts["first_layer_bed_temperature"])
|
||||
elif "bed_temperature" in opts:
|
||||
kwargs["max_bed"] = int(opts["bed_temperature"])
|
||||
else:
|
||||
return None
|
||||
|
||||
if "first_layer_temperature" in opts:
|
||||
kwargs["max_end"] = int(opts["first_layer_temperature"])
|
||||
elif "temperature" in opts:
|
||||
kwargs["max_end"] = int(opts["temperature"])
|
||||
else:
|
||||
return None
|
||||
|
||||
if "filament_type" in opts:
|
||||
kwargs["filament"] = opts["filament_type"]
|
||||
else:
|
||||
return None
|
||||
|
||||
return GcodeAnalysis(**kwargs)
|
||||
|
||||
|
||||
def analyze_gcode_file(p: Path) -> Optional[GcodeAnalysis]:
|
||||
with open(p) as fp:
|
||||
return analyze_gcode_str(fp.read())
|
|
@ -143,12 +143,13 @@ def serve(hostname: str, port: int, config: Path, trace: bool):
|
|||
|
||||
# Spawn the worker thread(s)
|
||||
Worker(cherrypy.engine, app, db_factory, poll_printers, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, analyze_files, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, assign_jobs, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, push_jobs, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, revoke_jobs, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, pull_jobs, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, send_emails, frequency=5).start()
|
||||
Worker(cherrypy.engine, app, db_factory, debug_queue, frequency=5).start()
|
||||
# Worker(cherrypy.engine, app, db_factory, debug_queue, frequency=5).start()
|
||||
|
||||
# Run the server
|
||||
cherrypy.engine.start()
|
||||
|
|
|
@ -182,6 +182,9 @@ class Db(Queries):
|
|||
digest.update(password.encode("utf-8"))
|
||||
res = super().try_login(username=username, hash=digest.hexdigest())
|
||||
if not res:
|
||||
print("WARNING: Failed to log in!")
|
||||
for it in self._cursor.execute("SELECT * FROM users").fetchall():
|
||||
print("DEBUG", it)
|
||||
return None
|
||||
|
||||
return self.create_key(uid=res.id, name="web session", ttl=ttl)
|
||||
|
|
|
@ -8,6 +8,18 @@ CREATE TABLE IF NOT EXISTS files (
|
|||
, FOREIGN KEY(user_id) REFERENCES user(id)
|
||||
);
|
||||
|
||||
-- name: migration-0004-create-file-analysis#
|
||||
CREATE TABLE IF NOT EXISTS file_analysis (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT
|
||||
, max_x INTEGER
|
||||
, max_y INTEGER
|
||||
, max_z INTEGER
|
||||
, max_end INTEGER
|
||||
, max_bed INTEGER
|
||||
, filament_id INTEGER REFERENCES filament(id)
|
||||
, file_id INTEGER REFERENCES file(id)
|
||||
);
|
||||
|
||||
-- name: create-file^
|
||||
INSERT INTO files (
|
||||
user_id
|
||||
|
@ -45,3 +57,37 @@ WHERE
|
|||
user_id = :uid
|
||||
AND id = :fid
|
||||
;
|
||||
|
||||
-- name: create-analysis^
|
||||
INSERT INTO file_analysis (
|
||||
, max_x
|
||||
, max_y
|
||||
, max_z
|
||||
, max_end
|
||||
, max_bed
|
||||
, filament_id
|
||||
, file_id
|
||||
)
|
||||
VALUES (
|
||||
:max_x
|
||||
, :max_y
|
||||
, :max_z
|
||||
, :max_end
|
||||
, :max_bed
|
||||
, :filament_id
|
||||
, :file_id
|
||||
)
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
|
||||
-- name: list-unanalyzed-files
|
||||
SELECT
|
||||
f.id
|
||||
, f.filename
|
||||
FROM files f
|
||||
LEFT JOIN file_analysis fa
|
||||
ON f.id = fa.file_id
|
||||
WHERE
|
||||
fa.file_id IS NULL
|
||||
;
|
||||
|
|
|
@ -69,10 +69,16 @@ WHERE
|
|||
-- name: list-job-queue
|
||||
SELECT
|
||||
*
|
||||
FROM jobs
|
||||
FROM jobs j
|
||||
INNER JOIN files f
|
||||
ON j.file_id = f.id
|
||||
INNER JOIN file_analysis fa
|
||||
ON fa.file_id = f.id
|
||||
WHERE
|
||||
finished_at IS NULL
|
||||
AND (:uid IS NULL OR user_id = :uid)
|
||||
AND (:uid IS NULL OR j.user_id = :uid)
|
||||
AND f.id IS NOT NULL
|
||||
AND fa.id IS NOT NULL
|
||||
;
|
||||
|
||||
-- name: poll-job-queue^
|
||||
|
|
|
@ -41,7 +41,7 @@ CREATE TABLE IF NOT EXISTS printer_chassis (
|
|||
);
|
||||
|
||||
INSERT INTO printer_chassis (name, limit_x, limit_y, limit_z, limit_bed, limit_hotend, limit_tools) VALUES (
|
||||
'Creality CR-10v3', 300, 300, 400, 100, 260, 1
|
||||
'Creality CR-10v3', 310, 310, 400, 100, 260, 1
|
||||
);
|
||||
|
||||
INSERT INTO printer_chassis (name, limit_x, limit_y, limit_z, limit_bed, limit_hotend, limit_tools) VALUES (
|
||||
|
@ -181,3 +181,14 @@ SELECT
|
|||
, name
|
||||
FROM filament
|
||||
;
|
||||
|
||||
-- name: create-filament^
|
||||
INSERT OR IGNORE INTO filament (
|
||||
name
|
||||
)
|
||||
VALUES (
|
||||
:name
|
||||
)
|
||||
RETURNING
|
||||
id
|
||||
;
|
||||
|
|
|
@ -77,7 +77,8 @@ INNER JOIN users u
|
|||
WHERE
|
||||
(expiration IS NULL OR unixepoch(expiration) > unixepoch('now'))
|
||||
AND k.id = :kid
|
||||
AND u.enabled_at IS NOT NULL -- and the user is not disabled!
|
||||
AND (u.enabled_at IS NOT NULL -- and the user is not disabled!
|
||||
OR u.group_id = 0) -- or the user is a root
|
||||
;
|
||||
|
||||
-- name: refresh-key
|
||||
|
|
|
@ -58,6 +58,10 @@ label {
|
|||
flex: 1;
|
||||
}
|
||||
|
||||
.u-flex-wrap {
|
||||
flex-wrap: wrap;
|
||||
}
|
||||
|
||||
@media (max-width: 760px) {
|
||||
.file, .printer, .key, .job {
|
||||
flex-direction: column;
|
||||
|
|
|
@ -4,10 +4,10 @@
|
|||
{% if files %}
|
||||
{% for file in files %}
|
||||
<div class="file row u-flex">
|
||||
<div class="details six columns">
|
||||
<div class="details six columns u-flex u-flex-wrap">
|
||||
<span class="file-name">{{ file.filename }}</span>
|
||||
<span class="file-sucesses">{{ file.print_successes }}</span> successes
|
||||
<span class="file-failures">{{ file.print_failures }}</span> errors
|
||||
<span class="file-sucesses">{{ file.print_successes }} successes</span>
|
||||
<span class="file-failures">{{ file.print_failures }} errors</span>
|
||||
</div>
|
||||
<div class="controls u-flex u-ml-auto">
|
||||
{{ macros.start_job(file.id) }}
|
||||
|
|
|
@ -4,11 +4,10 @@
|
|||
{% if jobs %}
|
||||
{% for job in jobs %}
|
||||
<div class="job row u-flex">
|
||||
<div class="details six columns u-flex">
|
||||
<div class="job-status u-flex">
|
||||
<label for="state">Job</label>
|
||||
<div class="dot {{ macros.job_state(job) }}" style="--dot-size: 1em;"> </div>
|
||||
<span name="state">{{ macros.job_state(job) }}</span>
|
||||
<div class="details six columns u-flex u-flex-wrap">
|
||||
<div class="job-filename u-flex">
|
||||
<label for="filename">File</label>
|
||||
<span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename or "it's a secret"}}</span>
|
||||
</div>
|
||||
{% if job.printer_id %}
|
||||
<div class="job-printer u-flex">
|
||||
|
@ -16,18 +15,20 @@
|
|||
<span name="printer">{{ ctx.db.fetch_printer(job.printer_id).name }}</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
<div class="job-filename u-flex">
|
||||
<label for="filename">File</label>
|
||||
<span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename or "it's a secret"}}</span>
|
||||
</div>
|
||||
{% if job.finished_at and job.started_at %}
|
||||
<div class="job-runtime u-flex">
|
||||
<label for="runtime">Runtime</label>
|
||||
<span name="Runtime">{{ (datetime.fromisoformat(job.finished_at) - datetime.fromisoformat(job.started_at)) }}</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
<div class="two columns">
|
||||
<div class="job-status u-flex">
|
||||
<label for="state">Status</label>
|
||||
<div class="dot {{ macros.job_state(job) }} tooltip bottom" data-text="{{ macros.job_state(job) }}" style="--dot-size: 1em;"> </div>
|
||||
</div>
|
||||
<div class="controls u-flex u-ml-auto">
|
||||
</div>
|
||||
<div class="controls four columns u-flex u-ml-auto">
|
||||
{% if ctx.uid %}
|
||||
{{ macros.duplicate_job(job.id) }}
|
||||
{{ macros.delete_job(job.id) }}
|
||||
|
|
|
@ -5,10 +5,9 @@
|
|||
{% for job in jobs %}
|
||||
<div class="job row u-flex">
|
||||
<div class="details six columns u-flex">
|
||||
<div class="job-status u-flex">
|
||||
<label for="state">Job</label>
|
||||
<div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;"> </div>
|
||||
<span name="state">{{ macros.job_state(job) }}</span>
|
||||
<div class="job-filename u-flex">
|
||||
<label for="filename">File</label>
|
||||
<span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename or "it's a secret"}}</span>
|
||||
</div>
|
||||
{% if job.printer_id %}
|
||||
<div class="job-printer u-flex">
|
||||
|
@ -16,10 +15,6 @@
|
|||
<span name="printer">{{ ctx.db.fetch_printer(job.printer_id).name }}</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
<div class="job-filename u-flex">
|
||||
<label for="filename">File</label>
|
||||
<span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename or "it's a secret"}}</span>
|
||||
</div>
|
||||
{% if job.started_at %}
|
||||
<div class="job-runtime u-flex">
|
||||
<label for="runtime">Runtime</label>
|
||||
|
@ -27,6 +22,12 @@
|
|||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
<div class="two columns">
|
||||
<div class="job-status u-flex">
|
||||
<label for="state">Status</label>
|
||||
<div class="dot {{ macros.job_state(job) }} tooltip bottom" data-text="{{ macros.job_state(job) }}" style="--dot-size: 1em;"> </div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="controls u-flex u-ml-auto">
|
||||
{% if ctx.uid %}
|
||||
{{ macros.duplicate_job(job.id) }}
|
||||
|
|
|
@ -17,6 +17,7 @@ from urllib import parse as urlparse
|
|||
|
||||
from cherrypy.process.plugins import Monitor
|
||||
from fastmail import FastMailSMTP
|
||||
from gcode import analyze_gcode_file
|
||||
from flask import Flask as App
|
||||
from octorest import OctoRest as _OR
|
||||
from requests import Response
|
||||
|
@ -290,14 +291,35 @@ def send_emails(app, db: Db):
|
|||
db.send_email(eid=message.id)
|
||||
|
||||
|
||||
def analyze_files(app: App, db: Db):
|
||||
for unanalyzed in db.list_unanalyzed_files():
|
||||
record = analyze_gcode_file(Path(unanalyzed.filename))
|
||||
db.create_analysis(
|
||||
file_id=unanalyzed.id,
|
||||
max_x=record.max_x,
|
||||
max_y=record.max_y,
|
||||
max_z=record.max_z,
|
||||
max_end=record.max_end,
|
||||
max_bed=record.max_bed,
|
||||
filament_id=db.create_filament(record.filament),
|
||||
)
|
||||
|
||||
|
||||
def debug_queue(app: App, db: Db):
|
||||
output = ["---"]
|
||||
|
||||
for job in db.list_running_jobs():
|
||||
output.append(repr(job))
|
||||
|
||||
for printer in db.list_idle_printers():
|
||||
output.append(repr(printer))
|
||||
|
||||
for unanalyzed in db.list_unanalyzed_files():
|
||||
output.append(repr(unanalyzed))
|
||||
|
||||
print("\n".join(output))
|
||||
|
||||
|
||||
def toil(*fs):
|
||||
def _helper(*args, **kwargs):
|
||||
for f in fs:
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from tentacles.db import Db
|
||||
|
||||
# FIXME: Should this be an autouse fixture? Maybe. Doesn't buy much tho.
|
||||
logging.addLevelName(logging.DEBUG - 5, "TRACE")
|
||||
logging.TRACE = logging.DEBUG - 5
|
||||
|
||||
@pytest.yield_fixture
|
||||
|
||||
@pytest.fixture
|
||||
def db():
|
||||
conn = Db(":memory:")
|
||||
conn.connect()
|
||||
conn.migrate()
|
||||
yield conn
|
||||
conn.close()
|
||||
|
||||
|
@ -25,26 +31,23 @@ def password_testy():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def uid_testy(db: Db, username_testy, password_testy):
|
||||
with db.savepoint():
|
||||
return db.try_create_user(
|
||||
username=username_testy,
|
||||
email=username_testy,
|
||||
password=password_testy,
|
||||
sid=1,
|
||||
).id
|
||||
def uid_testy(db: Db, username_testy, password_testy) -> int:
|
||||
return db.try_create_user(
|
||||
username=username_testy,
|
||||
email=username_testy,
|
||||
password=password_testy,
|
||||
sid=1,
|
||||
gid=0, # Note: to bypass the approve/enable machinery
|
||||
).id
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def login_ttl():
|
||||
def login_ttl() -> timedelta:
|
||||
return timedelta(hours=12)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sid_testy(db: Db, uid_testy, username_testy, password_testy, login_ttl):
|
||||
with db.savepoint():
|
||||
res = db.try_login(
|
||||
username=username_testy, password=password_testy, ttl=login_ttl
|
||||
)
|
||||
assert res.user_id == uid_testy
|
||||
return res.id
|
||||
res = db.try_login(username=username_testy, password=password_testy, ttl=login_ttl)
|
||||
assert res.user_id == uid_testy
|
||||
return res.id
|
||||
|
|
54
projects/tentacles/test/python/test_gcode.py
Normal file
54
projects/tentacles/test/python/test_gcode.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import re
|
||||
|
||||
from gcode import (
|
||||
parse_prusa_config_str,
|
||||
OPTION_PATTERN,
|
||||
analyze_gcode_str,
|
||||
GcodeAnalysis,
|
||||
)
|
||||
|
||||
|
||||
def test_option_pattern():
|
||||
assert re.match(OPTION_PATTERN, "\n") is None
|
||||
assert re.findall(OPTION_PATTERN, "; foo = bar\n")
|
||||
assert re.findall(OPTION_PATTERN, "; foo = bar\n; baz = qux")
|
||||
|
||||
|
||||
def test_parse_config_str():
|
||||
assert parse_prusa_config_str("") == {}
|
||||
assert (
|
||||
parse_prusa_config_str(
|
||||
"""
|
||||
; prusaslicer_config = begin
|
||||
; foo = bar
|
||||
; prusaslicer_config = end
|
||||
"""
|
||||
)
|
||||
== {"foo": "bar"}
|
||||
)
|
||||
|
||||
|
||||
def test_analyze_gcode():
|
||||
assert (
|
||||
analyze_gcode_str(
|
||||
"""
|
||||
|
||||
gcode garbage
|
||||
|
||||
; some comment
|
||||
more garbage
|
||||
|
||||
; prusaslicer_config = begin
|
||||
; bed_shape = 5x5,95x5,95x95,5x95
|
||||
; max_print_height = 100
|
||||
; first_layer_bed_temperature = 100
|
||||
; first_layer_temperature = 195
|
||||
; filament_type = PETG
|
||||
; prusaslicer_config = end
|
||||
|
||||
"""
|
||||
)
|
||||
== GcodeAnalysis(100, 100, 100, 100, 195, "PETG")
|
||||
)
|
|
@ -3,12 +3,12 @@
|
|||
from tentacles.db import Db
|
||||
|
||||
|
||||
def test_db_initializes(store: Db):
|
||||
assert isinstance(store, Db)
|
||||
def test_db_initializes(db: Db):
|
||||
assert isinstance(db, Db)
|
||||
|
||||
|
||||
def test_db_savepoint(store: Db):
|
||||
obj = store.savepoint()
|
||||
def test_db_savepoint(db: Db):
|
||||
obj = db.savepoint()
|
||||
|
||||
assert hasattr(obj, "__enter__")
|
||||
assert hasattr(obj, "__exit__")
|
||||
|
|
|
@ -1,40 +1,39 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from tentacles.store import Store
|
||||
from tentacles.db import Db
|
||||
|
||||
|
||||
def test_mkuser(store: Store, username_testy, password_testy):
|
||||
res = store.try_create_user(
|
||||
def test_mkuser(db: Db, username_testy, password_testy):
|
||||
res = db.try_create_user(
|
||||
username=username_testy, email=username_testy, password=password_testy
|
||||
)
|
||||
assert res
|
||||
assert store.list_users() == [(res.id, username_testy)]
|
||||
assert [(it.id, it.name) for it in db.list_users()] == [(res.id, username_testy)]
|
||||
|
||||
|
||||
def test_mksession(store: Store, uid_testy, username_testy, password_testy, login_ttl):
|
||||
res = store.try_login(
|
||||
username=username_testy, password=password_testy, ttl=login_ttl
|
||||
)
|
||||
def test_mksession(db: Db, uid_testy, username_testy, password_testy, login_ttl):
|
||||
assert uid_testy
|
||||
res = db.try_login(username=username_testy, password=password_testy, ttl=login_ttl)
|
||||
assert res is not None
|
||||
assert [it.id for it in store.list_keys(uid=uid_testy)] == [res.id]
|
||||
assert store.try_key(kid=res.id).user_id == uid_testy
|
||||
assert [it.id for it in db.list_keys(uid=uid_testy)] == [res.id]
|
||||
assert db.try_key(kid=res.id).user_id == uid_testy
|
||||
|
||||
|
||||
def test_refresh_key(store: Store, sid_testy, login_ttl):
|
||||
before = store.fetch_key(kid=sid_testy)
|
||||
store.refresh_key(kid=sid_testy, ttl=login_ttl * 2)
|
||||
after = store.fetch_key(kid=sid_testy)
|
||||
def test_refresh_key(db: Db, sid_testy, login_ttl):
|
||||
before = db.fetch_key(kid=sid_testy)
|
||||
db.refresh_key(kid=sid_testy, ttl=login_ttl * 2)
|
||||
after = db.fetch_key(kid=sid_testy)
|
||||
assert before != after
|
||||
|
||||
|
||||
def tets_mkkey(store: Store, sid_testy, uid_testy):
|
||||
assert store.try_key(kid=sid_testy) == uid_testy
|
||||
new_key = store.create_key(kid=sid_testy, ttl=None)
|
||||
def tets_mkkey(db: Db, sid_testy, uid_testy):
|
||||
assert db.try_key(kid=sid_testy) == uid_testy
|
||||
new_key = db.create_key(kid=sid_testy, ttl=None)
|
||||
assert new_key is not None
|
||||
assert store.try_key(kid=new_key) == uid_testy
|
||||
assert db.try_key(kid=new_key) == uid_testy
|
||||
|
||||
|
||||
def test_logout(store: Store, uid_testy, sid_testy):
|
||||
assert store.try_key(kid=sid_testy)
|
||||
store.delete_key(uid=uid_testy, kid=sid_testy)
|
||||
assert not store.try_key(kid=sid_testy)
|
||||
def test_logout(db: Db, uid_testy, sid_testy):
|
||||
assert db.try_key(kid=sid_testy)
|
||||
db.delete_key(uid=uid_testy, kid=sid_testy)
|
||||
assert not db.try_key(kid=sid_testy)
|
||||
|
|
Loading…
Reference in a new issue