Lotta state sync improvements
This commit is contained in:
		
					parent
					
						
							
								3fb9b6a040
							
						
					
				
			
			
				commit
				
					
						dcce448bc9
					
				
			
		
					 9 changed files with 198 additions and 68 deletions
				
			
		projects/tentacles/src/python/tentacles
|  | @ -41,7 +41,7 @@ def manipulate_jobs(): | |||
| 
 | ||||
|         case "cancel": | ||||
|             ctx.db.cancel_job(ctx.uid, int(request.form.get("job_id"))) | ||||
|             flash("Cancellation reqiested", category="info") | ||||
|             flash("Cancellation reqested", category="info") | ||||
| 
 | ||||
|         case "delete": | ||||
|             ctx.db.delete_job(ctx.uid, int(request.form.get("job_id"))) | ||||
|  |  | |||
|  | @ -49,11 +49,12 @@ CREATE TABLE IF NOT EXISTS printer_statuses ( | |||
|  , UNIQUE(name) | ||||
| ); | ||||
| 
 | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) values (-1, 'error'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) values (0, 'disconnected'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) values (1, 'connected'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) values (2, 'idle'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) values (3, 'running'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (-1, 'error'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (0, 'disconnected'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (2, 'connecting'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (2, 'connected'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (3, 'idle'); | ||||
| INSERT OR IGNORE INTO printer_statuses (id, name) VALUES (4, 'running'); | ||||
| 
 | ||||
| CREATE TABLE IF NOT EXISTS printers ( | ||||
|    id INTEGER PRIMARY KEY AUTOINCREMENT | ||||
|  |  | |||
|  | @ -8,6 +8,7 @@ | |||
| @import "tirefire/timers"; | ||||
| @import "tirefire/nav"; | ||||
| @import "tirefire/dots"; | ||||
| @import "tirefire/tooltips"; | ||||
| 
 | ||||
| .controls a, | ||||
| .controls form { | ||||
|  | @ -18,11 +19,16 @@ | |||
| .printer, | ||||
| .key, | ||||
| .job { | ||||
|     margin-top: 4px; | ||||
|   margin-top: 4px; | ||||
| } | ||||
| 
 | ||||
| .details { | ||||
|     overflow: hidden; | ||||
| .file .details, | ||||
| .printer .details, | ||||
| .key .details, | ||||
| .job .details { | ||||
|   div { | ||||
|     margin-right: 10px; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| // Hide the header name if we're on a mobile device | ||||
|  | @ -31,3 +37,7 @@ | |||
|     display: none; | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| label { | ||||
|   margin-right: 10px; | ||||
| } | ||||
|  |  | |||
|  | @ -13,6 +13,11 @@ | |||
|   border-radius: 50%; | ||||
| } | ||||
| 
 | ||||
| .dot::before, | ||||
| .dot::after { | ||||
|   animation: disabled; | ||||
| } | ||||
| 
 | ||||
| .dot.success { | ||||
|   background-color: $secondary_green; | ||||
| } | ||||
|  | @ -25,8 +30,16 @@ | |||
|   background-color: $secondary_green; | ||||
| } | ||||
| 
 | ||||
| .dot.error, | ||||
| .dot.cancelling { | ||||
|   background-color: $yellow; | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| .dot.cancelled { | ||||
|   background-color: $yellow; | ||||
| } | ||||
| 
 | ||||
| .dot.failed { | ||||
|   background-color: $red; | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -272,7 +272,8 @@ class Store(object): | |||
|             SELECT p.id | ||||
|             FROM printers p | ||||
|             LEFT JOIN (SELECT id, printer_id FROM jobs WHERE finished_at IS NULL) j ON p.id = j.printer_id | ||||
|             WHERE j.id IS NULL | ||||
|             INNER JOIN printer_statuses s ON p.status_id = s.id | ||||
|             WHERE j.id IS NULL AND s.name = 'idle' | ||||
|             """ | ||||
|         ).fetchall() | ||||
| 
 | ||||
|  | @ -370,6 +371,19 @@ class Store(object): | |||
|             [], | ||||
|         ).fetchall() | ||||
| 
 | ||||
|     @requires_conn | ||||
|     def list_job_history(self, uid: Optional[int] = None): | ||||
|         """Enumerate jobs in priority order. Note: ignores completed jobs.""" | ||||
|         cond = f"user_id = {uid}" if uid else "TRUE" | ||||
|         return self._conn.execute( | ||||
|             f""" | ||||
|             SELECT * FROM jobs | ||||
|             WHERE finished_at IS NOT NULL AND {cond} | ||||
|             ORDER BY datetime(finished_at) DESC | ||||
|             """, | ||||
|             [], | ||||
|         ).fetchall() | ||||
| 
 | ||||
|     @requires_conn | ||||
|     def list_mapped_jobs(self): | ||||
|         """Scheduler detail. List mapped but not started jobs.""" | ||||
|  | @ -406,14 +420,12 @@ class Store(object): | |||
| 
 | ||||
|     @requires_conn | ||||
|     def list_cancelled_jobs(self): | ||||
|         """Scheduler detail. List jobs which have been cancelled but are still 'live'.""" | ||||
|         """Scheduler detail. List jobs which have been cancelled but are still 'live' (not finished).""" | ||||
| 
 | ||||
|         return self._conn.execute( | ||||
|             """ | ||||
|             SELECT * FROM jobs | ||||
|             WHERE started_at IS NOT NULL | ||||
|                   AND printer_id IS NOT NULL | ||||
|                   AND finished_at IS NULL | ||||
|             WHERE finished_at IS NULL | ||||
|                   AND cancelled_at IS NOT NULL | ||||
|             """, | ||||
|             [], | ||||
|  |  | |||
|  | @ -1,18 +1,29 @@ | |||
| {% import "macros.html.j2" as macros %} | ||||
| <div class="history"> | ||||
|   <h2>Job history</h2> | ||||
|   {% with jobs = ctx.db.list_jobs(uid=ctx.uid) %} | ||||
|   {% with jobs = ctx.db.list_job_history(uid=ctx.uid) %} | ||||
|   {% if jobs %} | ||||
|   {% for job in jobs if job.finished_at %} | ||||
|   {% for job in jobs %} | ||||
|     <div class="job row u-flex"> | ||||
|       <div class="details"> | ||||
|         <span class="job-status one column"> | ||||
|           <div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;"> | ||||
|           </div> | ||||
|         </span> | ||||
|         <span class="job-filename eleven columns">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}</span> | ||||
|       <div class="details six columns u-flex"> | ||||
|         <div class="job-status u-flex"> | ||||
|           <label for="state">Job</label> | ||||
|           <div class="dot {{ macros.job_state(job) }}" style="--dot-size: 1em;"> </div> | ||||
|           <span name="state">{{ macros.job_state(job) }}</span> | ||||
|         </div> | ||||
|         {% if job.printer_id %} | ||||
|         <div class="job-printer u-flex"> | ||||
|           <label for="printer">Printer</label> | ||||
|           <span name="printer">{{ ctx.db.fetch_printer(job.printer_id).name }}</span> | ||||
|         </div> | ||||
|         {% endif %} | ||||
|         <div class="job-filename u-flex"> | ||||
|           <label for="filename">File</label> | ||||
|           <span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}</span> | ||||
|         </div> | ||||
|       </div> | ||||
|       <div class="controls u-flex u-ml-auto"> | ||||
|         {{ macros.duplicate_job(job.id) }} | ||||
|         {{ macros.delete_job(job.id) }} | ||||
|       </div> | ||||
|     </div> | ||||
|  |  | |||
|  | @ -3,25 +3,33 @@ | |||
|   <h2>Job queue</h2> | ||||
|   {% with jobs = ctx.db.list_job_queue(uid=ctx.uid) %} | ||||
|   {% if jobs %} | ||||
|     {% for job in jobs %} | ||||
|       <div class="job row u-flex"> | ||||
|         <div class="details seven colums"> | ||||
|           <div class="job-status one column"> | ||||
|             <div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;"> | ||||
|             </div> | ||||
|           </div> | ||||
|           <div class="job-filename six columns"> | ||||
|             {{ctx.db.fetch_file(ctx.uid, job.file_id).filename}} | ||||
|           </div> | ||||
|   {% for job in jobs %} | ||||
|     <div class="job row u-flex"> | ||||
|       <div class="details six columns u-flex"> | ||||
|         <div class="job-status u-flex"> | ||||
|           <label for="state">Job</label> | ||||
|           <div class="dot {{ macros.job_state(job) }} {{ 'dot--basic' if not job.state else '' }}" style="--dot-size: 1em;"> </div> | ||||
|           <span name="state">{{ macros.job_state(job) }}</span> | ||||
|         </div> | ||||
|         <div class="controls u-flex u-ml-auto"> | ||||
|           {{ macros.duplicate_job(job.id) }} | ||||
|           {{ macros.cancel_job(job.id) }} | ||||
|         {% if job.printer_id %} | ||||
|           <div class="job-printer u-flex"> | ||||
|             <label for="printer">Printer</label> | ||||
|             <span name="printer">{{ ctx.db.fetch_printer(job.printer_id).name }}</span> | ||||
|           </div> | ||||
|         {% endif %} | ||||
|         <div class="job-filename u-flex"> | ||||
|           <label for="filename">File</label> | ||||
|           <span name="filename">{{ctx.db.fetch_file(ctx.uid, job.file_id).filename}}</span> | ||||
|         </div> | ||||
|       </div> | ||||
|     {% endfor %} | ||||
|       <div class="controls u-flex u-ml-auto"> | ||||
|         {{ macros.duplicate_job(job.id) }} | ||||
|         {{ macros.cancel_job(job.id) }} | ||||
|       </div> | ||||
|     </div> | ||||
|   {% endfor %} | ||||
|   {% else %} | ||||
|     No pending tasks. {% if ctx.uid %}Start something!{% endif %} | ||||
|   {% endif %} | ||||
| {% endwith %} | ||||
|   {% endwith %} | ||||
| </div> | ||||
|  |  | |||
|  | @ -33,7 +33,10 @@ | |||
| {% endmacro %} | ||||
| 
 | ||||
| {% macro job_state(job) %} | ||||
| {{ 'queued' if (not job.finished_at and not job.printer_id) else 'running' if not job.finished_at else job.state }} | ||||
| {{ 'queued' if (not job.finished_at and not job.printer_id and not job.cancelled_at) else | ||||
|    'running' if (not job.finished_at and job.printer_id and not job.cancelled_at) else | ||||
|    'cancelling' if (not job.finished_at and job.printer_id and job.cancelled_at) else | ||||
|   job.state }} | ||||
| {% endmacro %} | ||||
| 
 | ||||
| {# #################################################################################################### #} | ||||
|  |  | |||
|  | @ -64,7 +64,6 @@ def corn_job(every: timedelta): | |||
|     return _decorator | ||||
| 
 | ||||
| 
 | ||||
| @corn_job(timedelta(seconds=5)) | ||||
| def poll_printers(db_factory: Callable[[], Store]) -> None: | ||||
|     """Poll printers for their status.""" | ||||
| 
 | ||||
|  | @ -75,11 +74,10 @@ def poll_printers(db_factory: Callable[[], Store]) -> None: | |||
|             try: | ||||
|                 client = OctoRest(url=url, apikey=api_key) | ||||
|                 printer_job = client.job_info() | ||||
|                 print(printer_job) | ||||
|                 try: | ||||
|                     printer_state = client.printer().get("state").get("flags", {}) | ||||
|                 except: | ||||
|                     printer_state = {"error": printer_job.get("error")} | ||||
|                 except HTTPError: | ||||
|                     printer_state = {"disconnected": True} | ||||
| 
 | ||||
|                 if printer_state.get("error"): | ||||
|                     # If there's a mapped job, we manually fail it so that | ||||
|  | @ -89,16 +87,32 @@ def poll_printers(db_factory: Callable[[], Store]) -> None: | |||
|                     if mapped_job: | ||||
|                         db.finish_job(mapped_job.id, "error") | ||||
| 
 | ||||
|                     db.update_printer_status(id, "error") | ||||
|                     if status != "error": | ||||
|                         print(f"Printer {printer.id} -> error") | ||||
|                         db.update_printer_status(id, "error") | ||||
| 
 | ||||
|                 elif printer_state.get("ready"): | ||||
|                     db.update_printer_status(id, "idle") | ||||
|                 elif printer_state.get("disconnected"): | ||||
|                     if status != "disconnected": | ||||
|                         print(f"Printer {printer.id} -> disconnected") | ||||
|                         db.update_printer_status(id, "disconnected") | ||||
| 
 | ||||
|                 elif printer_state.get("printing"): | ||||
|                     db.update_printer_status(id, "running") | ||||
|                     if status != "running": | ||||
|                         print(f"Printer {printer.id} -> running") | ||||
|                         db.update_printer_status(id, "running") | ||||
| 
 | ||||
|                 elif printer_job.get("state").lower() == "connecting": | ||||
|                     if status != "connecting": | ||||
|                         print(f"Printer {printer.id} -> connecting") | ||||
|                         db.update_printer_status(id, "connecting") | ||||
| 
 | ||||
|                 elif printer_state.get("ready"): | ||||
|                     if status != "idle": | ||||
|                         print(f"Printer {printer.id} -> idle") | ||||
|                         db.update_printer_status(id, "idle") | ||||
| 
 | ||||
|                 else: | ||||
|                     raise Exception(f"Indeterminate state {printer_state!r}") | ||||
|                     raise Exception(f"Indeterminate state {printer_job!r} {printer_state!r}") | ||||
| 
 | ||||
|             except (ConnectionError, Timeout): | ||||
|                 db.update_printer_status(id, "error") | ||||
|  | @ -115,7 +129,6 @@ def poll_printers(db_factory: Callable[[], Store]) -> None: | |||
|                     ) | ||||
| 
 | ||||
| 
 | ||||
| @corn_job(timedelta(seconds=5)) | ||||
| def assign_jobs(db_factory: Callable[[], Store]) -> None: | ||||
|     """Assign jobs to printers. Uploading files and job state management is handled separately.""" | ||||
| 
 | ||||
|  | @ -126,7 +139,6 @@ def assign_jobs(db_factory: Callable[[], Store]) -> None: | |||
|                 print(f"Mapped job {job_id} to printer {printer_id}") | ||||
| 
 | ||||
| 
 | ||||
| @corn_job(timedelta(seconds=5)) | ||||
| def push_jobs(db_factory: Callable[[], Store]) -> None: | ||||
|     """Ensure that job files are uploaded and started to the assigned printer.""" | ||||
| 
 | ||||
|  | @ -140,6 +152,16 @@ def push_jobs(db_factory: Callable[[], Store]) -> None: | |||
| 
 | ||||
|             try: | ||||
|                 client = OctoRest(url=printer.url, apikey=printer.api_key) | ||||
|                 printer_job = client.job_info() | ||||
|                 try: | ||||
|                     printer_state = client.printer().get("state").get("flags", {}) | ||||
|                 except HTTPError: | ||||
|                     printer_state = {"error": printer_job.get("error")} | ||||
| 
 | ||||
|                 if printer_state.get("error"): | ||||
|                     print(f"Printer {printer.id} is in error, can't push") | ||||
|                     continue | ||||
| 
 | ||||
|                 try: | ||||
|                     client.upload(file.path) | ||||
|                 except HTTPError as e: | ||||
|  | @ -158,16 +180,74 @@ def push_jobs(db_factory: Callable[[], Store]) -> None: | |||
|                 log.exception("Oop") | ||||
| 
 | ||||
| 
 | ||||
| @corn_job(timedelta(seconds=5)) | ||||
| def revoke_jobs(db_factory: Callable[[], Store]) -> None: | ||||
|     """Ensure that job files are uploaded and started to the assigned printer.""" | ||||
|     """Ensure that job files are uploaded and started to the assigned printer. | ||||
| 
 | ||||
|     Note that this will ALSO cancel jobs out of the print queue. | ||||
| 
 | ||||
|     """ | ||||
| 
 | ||||
|     with closing(db_factory()) as db: | ||||
|         for job in db.list_cancelled_jobs(): | ||||
|             if job.printer_id: | ||||
|                 printer = db.fetch_printer(job.printer_id) | ||||
|                 try: | ||||
|                     print(f"Cancelling running job {job.id}") | ||||
|                     client = OctoRest(url=printer.url, apikey=printer.api_key) | ||||
|                     try: | ||||
|                         client.cancel() | ||||
|                     except HTTPError as e: | ||||
|                         if e.response.status_code == 409: | ||||
|                             pass | ||||
|                         else: | ||||
|                             raise | ||||
| 
 | ||||
|                     print(f"Job {job.id} -> cancelled") | ||||
|                     db.finish_job(job.id, "cancelled") | ||||
|                 except TimeoutError: | ||||
|                     pass | ||||
| 
 | ||||
|                 except Exception: | ||||
|                     log.exception("Oop") | ||||
| 
 | ||||
|             else: | ||||
|                 print(f"Unmapped job {job.id} became cancelled") | ||||
|                 db.finish_job(job.id, "cancelled") | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def pull_jobs(db_factory: Callable[[], Store]) -> None: | ||||
|     """Poll the state of mapped printers to control jobs.""" | ||||
| 
 | ||||
|     with closing(db_factory()) as db: | ||||
|         for job in db.list_running_jobs(): | ||||
|             printer = db.fetch_printer(job.printer_id) | ||||
|             try: | ||||
|                 client = OctoRest(url=printer.url, apikey=printer.api_key) | ||||
|                 client.cancel() | ||||
|                 job_state = client.job_info() | ||||
|                 try: | ||||
|                     printer_state = client.printer().get("state").get("flags", {}) | ||||
|                 except HTTPError: | ||||
|                     printer_state = {"disconnected": True, "error": True} | ||||
| 
 | ||||
|                 if printer_state.get("printing"): | ||||
|                     pass | ||||
| 
 | ||||
|                 elif job_state.get("progress", {}).get("completion", 0.0) == 100.0: | ||||
|                     print(f"Job {job.id} has succeeded") | ||||
|                     db.finish_job(job.id, "success") | ||||
| 
 | ||||
|                 elif printer_state.get("error"): | ||||
|                     print(f"Job {job.id} has failed") | ||||
|                     db.finish_job(job.id, "failed") | ||||
| 
 | ||||
|                 elif printer_state.get("cancelling"): | ||||
|                     print(f"Job {job.id} has been acknowledged as cancelled") | ||||
|                     db.finish_job(job.id, "cancelled") | ||||
| 
 | ||||
|                 else: | ||||
|                     print(f"Job {job.id} is in a weird state {job_state.get('progress')!r} {printer_state!r}") | ||||
| 
 | ||||
|             except TimeoutError: | ||||
|                 pass | ||||
| 
 | ||||
|  | @ -176,22 +256,14 @@ def revoke_jobs(db_factory: Callable[[], Store]) -> None: | |||
| 
 | ||||
| 
 | ||||
| @corn_job(timedelta(seconds=5)) | ||||
| def pull_jobs(db_factory: Callable[[], Store]) -> None: | ||||
|     """Poll the state of mapped printers to control jobs.""" | ||||
| 
 | ||||
|     with closing(db_factory()) as db: | ||||
|         for job in db.list_running_jobs(): | ||||
|             printer = db.fetch_printer(job.printer_id) | ||||
|             if printer.status != "running": | ||||
|                 print(f"Job {job.id} finished {printer.status}") | ||||
|                 db.finish_job(job.id, printer.status) | ||||
| 
 | ||||
| def run_worker(db_factory): | ||||
|     poll_printers(db_factory) | ||||
|     assign_jobs(db_factory) | ||||
|     push_jobs(db_factory) | ||||
|     revoke_jobs(db_factory) | ||||
|     pull_jobs(db_factory) | ||||
| 
 | ||||
| def create_workers(db_factory: Callable[[], Store]) -> Event: | ||||
|     Thread(target=poll_printers, args=[db_factory]).start() | ||||
|     Thread(target=assign_jobs, args=[db_factory]).start() | ||||
|     Thread(target=push_jobs, args=[db_factory]).start() | ||||
|     Thread(target=revoke_jobs, args=[db_factory]).start() | ||||
|     Thread(target=pull_jobs, args=[db_factory]).start() | ||||
|     Thread(target=run_worker, args=[db_factory]).start() | ||||
| 
 | ||||
|     return SHUTDOWN | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue