Lotta state sync improvements
This commit is contained in:
parent
e082a4aa11
commit
2ccdd48d63
9 changed files with 198 additions and 68 deletions
|
@ -41,7 +41,7 @@ def manipulate_jobs():
|
|||
|
||||
case "cancel":
|
||||
ctx.db.cancel_job(ctx.uid, int(request.form.get("job_id")))
|
||||
flash("Cancellation reqiested", category="info")
|
||||
flash("Cancellation reqested", category="info")
|
||||
|
||||
case "delete":
|
||||
ctx.db.delete_job(ctx.uid, int(request.form.get("job_id")))
|
||||
|
|
|
@ -49,11 +49,12 @@ CREATE TABLE IF NOT EXISTS printer_statuses (
|
|||
, UNIQUE(name)
|
||||
);
|
||||
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) values (-1, 'error');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) values (0, 'disconnected');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) values (1, 'connected');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) values (2, 'idle');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) values (3, 'running');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (-1, 'error');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (0, 'disconnected');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (2, 'connecting');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (2, 'connected');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (3, 'idle');
|
||||
INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (4, 'running');
|
||||
|
||||
CREATE TABLE IF NOT EXISTS printers (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
@import "tirefire/timers";
|
||||
@import "tirefire/nav";
|
||||
@import "tirefire/dots";
|
||||
@import "tirefire/tooltips";
|
||||
|
||||
.controls a,
|
||||
.controls form {
|
||||
|
@ -21,8 +22,13 @@
|
|||
margin-top: 4px;
|
||||
}
|
||||
|
||||
.details {
|
||||
overflow: hidden;
|
||||
.file .details,
|
||||
.printer .details,
|
||||
.key .details,
|
||||
.job .details {
|
||||
div {
|
||||
margin-right: 10px;
|
||||
}
|
||||
}
|
||||
|
||||
// Hide the header name if we're on a mobile device
|
||||
|
@ -31,3 +37,7 @@
|
|||
display: none;
|
||||
}
|
||||
}
|
||||
|
||||
label {
|
||||
margin-right: 10px;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,11 @@
|
|||
border-radius: 50%;
|
||||
}
|
||||
|
||||
.dot::before,
|
||||
.dot::after {
|
||||
animation: disabled;
|
||||
}
|
||||
|
||||
.dot.success {
|
||||
background-color: $secondary_green;
|
||||
}
|
||||
|
@ -25,8 +30,16 @@
|
|||
background-color: $secondary_green;
|
||||
}
|
||||
|
||||
.dot.error,
|
||||
.dot.cancelling {
|
||||
background-color: $yellow;
|
||||
}
|
||||
|
||||
|
||||
.dot.cancelled {
|
||||
background-color: $yellow;
|
||||
}
|
||||
|
||||
.dot.failed {
|
||||
background-color: $red;
|
||||
}
|
||||
|
||||
|
|
|
@ -272,7 +272,8 @@ class Store(object):
|
|||
SELECT p.id
|
||||
FROM printers p
|
||||
LEFT JOIN (SELECT id, printer_id FROM jobs WHERE finished_at IS NULL) j ON p.id = j.printer_id
|
||||
WHERE j.id IS NULL
|
||||
INNER JOIN printer_statuses s ON p.status_id = s.id
|
||||
WHERE j.id IS NULL AND s.name = 'idle'
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
|
@ -370,6 +371,19 @@ class Store(object):
|
|||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_job_history(self, uid: Optional[int] = None):
|
||||
"""Enumerate jobs in priority order. Note: ignores completed jobs."""
|
||||
cond = f"user_id = {uid}" if uid else "TRUE"
|
||||
return self._conn.execute(
|
||||
f"""
|
||||
SELECT * FROM jobs
|
||||
WHERE finished_at IS NOT NULL AND {cond}
|
||||
ORDER BY datetime(finished_at) DESC
|
||||
""",
|
||||
[],
|
||||
).fetchall()
|
||||
|
||||
@requires_conn
|
||||
def list_mapped_jobs(self):
|
||||
"""Scheduler detail. List mapped but not started jobs."""
|
||||
|
@ -406,14 +420,12 @@ class Store(object):
|
|||
|
||||
@requires_conn
|
||||
def list_cancelled_jobs(self):
|
||||
"""Scheduler detail. List jobs which have been cancelled but are still 'live'."""
|
||||
"""Scheduler detail. List jobs which have been cancelled but are still 'live' (not finished)."""
|
||||
|
||||
return self._conn.execute(
|
||||
"""
|
||||
SELECT * FROM jobs
|
||||
WHERE started_at IS NOT NULL
|
||||
AND printer_id IS NOT NULL
|
||||
AND finished_at IS NULL
|
||||
WHERE finished_at IS NULL
|
||||
AND cancelled_at IS NOT NULL
|
||||
""",
|
||||
[],
|
||||
|
|
|
@ -1,18 +1,29 @@
|
|||
{% import "macros.html.j2" as macros %}
|
||||
<div class="history">
|
||||
<h2>Job history</h2>
|
||||
{% with jobs = ctx.db.list_jobs(uid=ctx.uid) %}
|
||||
{% with jobs = ctx.db.list_job_history(uid=ctx.uid) %}
|
||||
{% if jobs %}
|
||||
{% for job in jobs if job.finished_at %}
|
||||
{% for job in jobs %}
|
||||
<div class="job row u-flex">
|
||||
<div class="details">
|
||||
<span class="job-status one column">
|
||||
<div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;">
|
||||
<div class="details six columns u-flex">
|
||||
<div class="job-status u-flex">
|
||||
<label for="state">Job</label>
|
||||
<div class="dot {{ macros.job_state(job) }}" style="--dot-size: 1em;"> </div>
|
||||
<span name="state">{{ macros.job_state(job) }}</span>
|
||||
</div>
|
||||
{% if job.printer_id %}
|
||||
<div class="job-printer u-flex">
|
||||
<label for="printer">Printer</label>
|
||||
<span name="printer">{{ ctx.db.fetch_printer(job.printer_id).name }}</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
<div class="job-filename u-flex">
|
||||
<label for="filename">File</label>
|
||||
<span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}</span>
|
||||
</div>
|
||||
</span>
|
||||
<span class="job-filename eleven columns">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}</span>
|
||||
</div>
|
||||
<div class="controls u-flex u-ml-auto">
|
||||
{{ macros.duplicate_job(job.id) }}
|
||||
{{ macros.delete_job(job.id) }}
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
@ -5,13 +5,21 @@
|
|||
{% if jobs %}
|
||||
{% for job in jobs %}
|
||||
<div class="job row u-flex">
|
||||
<div class="details seven colums">
|
||||
<div class="job-status one column">
|
||||
<div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;">
|
||||
<div class="details six columns u-flex">
|
||||
<div class="job-status u-flex">
|
||||
<label for="state">Job</label>
|
||||
<div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;"> </div>
|
||||
<span name="state">{{ macros.job_state(job) }}</span>
|
||||
</div>
|
||||
{% if job.printer_id %}
|
||||
<div class="job-printer u-flex">
|
||||
<label for="printer">Printer</label>
|
||||
<span name="printer">{{ ctx.db.fetch_printer(job.printer_id).name }}</span>
|
||||
</div>
|
||||
<div class="job-filename six columns">
|
||||
{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}
|
||||
{% endif %}
|
||||
<div class="job-filename u-flex">
|
||||
<label for="filename">File</label>
|
||||
<span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}</span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="controls u-flex u-ml-auto">
|
||||
|
@ -23,5 +31,5 @@
|
|||
{% else %}
|
||||
No pending tasks. {% if ctx.uid %}Start something!{% endif %}
|
||||
{% endif %}
|
||||
{% endwith %}
|
||||
{% endwith %}
|
||||
</div>
|
||||
|
|
|
@ -33,7 +33,10 @@
|
|||
{% endmacro %}
|
||||
|
||||
{% macro job_state(job) %}
|
||||
{{ 'queued' if (not job.finished_at and not job.printer_id) else 'running' if not job.finished_at else job.state }}
|
||||
{{ 'queued' if (not job.finished_at and not job.printer_id and not job.cancelled_at) else
|
||||
'running' if (not job.finished_at and job.printer_id and not job.cancelled_at) else
|
||||
'cancelling' if (not job.finished_at and job.printer_id and job.cancelled_at) else
|
||||
job.state }}
|
||||
{% endmacro %}
|
||||
|
||||
{# #################################################################################################### #}
|
||||
|
|
|
@ -64,7 +64,6 @@ def corn_job(every: timedelta):
|
|||
return _decorator
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def poll_printers(db_factory: Callable[[], Store]) -> None:
|
||||
"""Poll printers for their status."""
|
||||
|
||||
|
@ -75,11 +74,10 @@ def poll_printers(db_factory: Callable[[], Store]) -> None:
|
|||
try:
|
||||
client = OctoRest(url=url, apikey=api_key)
|
||||
printer_job = client.job_info()
|
||||
print(printer_job)
|
||||
try:
|
||||
printer_state = client.printer().get("state").get("flags", {})
|
||||
except:
|
||||
printer_state = {"error": printer_job.get("error")}
|
||||
except HTTPError:
|
||||
printer_state = {"disconnected": True}
|
||||
|
||||
if printer_state.get("error"):
|
||||
# If there's a mapped job, we manually fail it so that
|
||||
|
@ -89,16 +87,32 @@ def poll_printers(db_factory: Callable[[], Store]) -> None:
|
|||
if mapped_job:
|
||||
db.finish_job(mapped_job.id, "error")
|
||||
|
||||
if status != "error":
|
||||
print(f"Printer {printer.id} -> error")
|
||||
db.update_printer_status(id, "error")
|
||||
|
||||
elif printer_state.get("ready"):
|
||||
db.update_printer_status(id, "idle")
|
||||
elif printer_state.get("disconnected"):
|
||||
if status != "disconnected":
|
||||
print(f"Printer {printer.id} -> disconnected")
|
||||
db.update_printer_status(id, "disconnected")
|
||||
|
||||
elif printer_state.get("printing"):
|
||||
if status != "running":
|
||||
print(f"Printer {printer.id} -> running")
|
||||
db.update_printer_status(id, "running")
|
||||
|
||||
elif printer_job.get("state").lower() == "connecting":
|
||||
if status != "connecting":
|
||||
print(f"Printer {printer.id} -> connecting")
|
||||
db.update_printer_status(id, "connecting")
|
||||
|
||||
elif printer_state.get("ready"):
|
||||
if status != "idle":
|
||||
print(f"Printer {printer.id} -> idle")
|
||||
db.update_printer_status(id, "idle")
|
||||
|
||||
else:
|
||||
raise Exception(f"Indeterminate state {printer_state!r}")
|
||||
raise Exception(f"Indeterminate state {printer_job!r} {printer_state!r}")
|
||||
|
||||
except (ConnectionError, Timeout):
|
||||
db.update_printer_status(id, "error")
|
||||
|
@ -115,7 +129,6 @@ def poll_printers(db_factory: Callable[[], Store]) -> None:
|
|||
)
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def assign_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Assign jobs to printers. Uploading files and job state management is handled separately."""
|
||||
|
||||
|
@ -126,7 +139,6 @@ def assign_jobs(db_factory: Callable[[], Store]) -> None:
|
|||
print(f"Mapped job {job_id} to printer {printer_id}")
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def push_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Ensure that job files are uploaded and started to the assigned printer."""
|
||||
|
||||
|
@ -140,6 +152,16 @@ def push_jobs(db_factory: Callable[[], Store]) -> None:
|
|||
|
||||
try:
|
||||
client = OctoRest(url=printer.url, apikey=printer.api_key)
|
||||
printer_job = client.job_info()
|
||||
try:
|
||||
printer_state = client.printer().get("state").get("flags", {})
|
||||
except HTTPError:
|
||||
printer_state = {"error": printer_job.get("error")}
|
||||
|
||||
if printer_state.get("error"):
|
||||
print(f"Printer {printer.id} is in error, can't push")
|
||||
continue
|
||||
|
||||
try:
|
||||
client.upload(file.path)
|
||||
except HTTPError as e:
|
||||
|
@ -158,16 +180,74 @@ def push_jobs(db_factory: Callable[[], Store]) -> None:
|
|||
log.exception("Oop")
|
||||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def revoke_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Ensure that job files are uploaded and started to the assigned printer."""
|
||||
"""Ensure that job files are uploaded and started to the assigned printer.
|
||||
|
||||
Note that this will ALSO cancel jobs out of the print queue.
|
||||
|
||||
"""
|
||||
|
||||
with closing(db_factory()) as db:
|
||||
for job in db.list_cancelled_jobs():
|
||||
if job.printer_id:
|
||||
printer = db.fetch_printer(job.printer_id)
|
||||
try:
|
||||
print(f"Cancelling running job {job.id}")
|
||||
client = OctoRest(url=printer.url, apikey=printer.api_key)
|
||||
try:
|
||||
client.cancel()
|
||||
except HTTPError as e:
|
||||
if e.response.status_code == 409:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
print(f"Job {job.id} -> cancelled")
|
||||
db.finish_job(job.id, "cancelled")
|
||||
except TimeoutError:
|
||||
pass
|
||||
|
||||
except Exception:
|
||||
log.exception("Oop")
|
||||
|
||||
else:
|
||||
print(f"Unmapped job {job.id} became cancelled")
|
||||
db.finish_job(job.id, "cancelled")
|
||||
|
||||
|
||||
|
||||
def pull_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Poll the state of mapped printers to control jobs."""
|
||||
|
||||
with closing(db_factory()) as db:
|
||||
for job in db.list_running_jobs():
|
||||
printer = db.fetch_printer(job.printer_id)
|
||||
try:
|
||||
client = OctoRest(url=printer.url, apikey=printer.api_key)
|
||||
client.cancel()
|
||||
job_state = client.job_info()
|
||||
try:
|
||||
printer_state = client.printer().get("state").get("flags", {})
|
||||
except HTTPError:
|
||||
printer_state = {"disconnected": True, "error": True}
|
||||
|
||||
if printer_state.get("printing"):
|
||||
pass
|
||||
|
||||
elif job_state.get("progress", {}).get("completion", 0.0) == 100.0:
|
||||
print(f"Job {job.id} has succeeded")
|
||||
db.finish_job(job.id, "success")
|
||||
|
||||
elif printer_state.get("error"):
|
||||
print(f"Job {job.id} has failed")
|
||||
db.finish_job(job.id, "failed")
|
||||
|
||||
elif printer_state.get("cancelling"):
|
||||
print(f"Job {job.id} has been acknowledged as cancelled")
|
||||
db.finish_job(job.id, "cancelled")
|
||||
|
||||
else:
|
||||
print(f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}")
|
||||
|
||||
except TimeoutError:
|
||||
pass
|
||||
|
||||
|
@ -176,22 +256,14 @@ def revoke_jobs(db_factory: Callable[[], Store]) -> None:
|
|||
|
||||
|
||||
@corn_job(timedelta(seconds=5))
|
||||
def pull_jobs(db_factory: Callable[[], Store]) -> None:
|
||||
"""Poll the state of mapped printers to control jobs."""
|
||||
|
||||
with closing(db_factory()) as db:
|
||||
for job in db.list_running_jobs():
|
||||
printer = db.fetch_printer(job.printer_id)
|
||||
if printer.status != "running":
|
||||
print(f"Job {job.id} finished {printer.status}")
|
||||
db.finish_job(job.id, printer.status)
|
||||
|
||||
def run_worker(db_factory):
|
||||
poll_printers(db_factory)
|
||||
assign_jobs(db_factory)
|
||||
push_jobs(db_factory)
|
||||
revoke_jobs(db_factory)
|
||||
pull_jobs(db_factory)
|
||||
|
||||
def create_workers(db_factory: Callable[[], Store]) -> Event:
|
||||
Thread(target=poll_printers, args=[db_factory]).start()
|
||||
Thread(target=assign_jobs, args=[db_factory]).start()
|
||||
Thread(target=push_jobs, args=[db_factory]).start()
|
||||
Thread(target=revoke_jobs, args=[db_factory]).start()
|
||||
Thread(target=pull_jobs, args=[db_factory]).start()
|
||||
Thread(target=run_worker, args=[db_factory]).start()
|
||||
|
||||
return SHUTDOWN
|
||||
|
|
Loading…
Reference in a new issue