From 8e800a0507143576b6ac264117534d3ed56e72c7 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Thu, 19 Aug 2021 23:54:08 -0600 Subject: [PATCH] Make the jobq closeable and document it. --- projects/jobq/README.md | 31 +++++++++++++++++++++++ projects/jobq/src/python/jobq/__init__.py | 12 +++++++++ 2 files changed, 43 insertions(+) diff --git a/projects/jobq/README.md b/projects/jobq/README.md index 914faf5..f97d230 100644 --- a/projects/jobq/README.md +++ b/projects/jobq/README.md @@ -2,6 +2,37 @@ Abusing sqlite3 as a job queue. +## API + +**Note** - this API currently does not support multiplexing queues within a single state store. +As files are ultimately the performance bottleneck, using multiple files PROBABLY serves you better anyway. +But investigation here is needed. + +### jobq.JobQueue(path) +Construct and return a fully migrated and connected job queue. +The queue is closable, and usable as a context manager. + +### jobq.JobQueue.create(payload, new_state=None) +Create a job with the given payload and optional state. + +### jobq.JobQueue.get(job_id) +Read a job back by ID from the queue. + +### jobq.JobQueue.poll(query, new_state) +Poll the queue for a single job matching the given query, atomically advancing it to the new state and returning it as if from `get()`. +Note that poll selects the OLDEST MATCHING JOB FIRST, thus providing a global round-robin scheduler on jobs, optimizing for progress not throughput. + +### jobq.JobQueue.cas_state(job_id, old_state, new_state) +Atomically update the state of a single job from an old state to a new state. +Note that this operation NEED NOT SUCCEED, as the job MAY be concurrently modified. +Job queue algorithms should either be lock-free or use state to implement markers/locks with timeout based recovery. + +### jobq.JobQueue.append_event(job_id, event) +Append a user-defined event to the given job's event log. + +### jobq.JobQueue.delete(job_id) +Purge a given job by ID from the system. + ## Benchmarks Benchmarks are extremely steady. diff --git a/projects/jobq/src/python/jobq/__init__.py b/projects/jobq/src/python/jobq/__init__.py index bd6a963..dab40be 100644 --- a/projects/jobq/src/python/jobq/__init__.py +++ b/projects/jobq/src/python/jobq/__init__.py @@ -180,6 +180,18 @@ class JobQueue(object): self._queries = with_migrations("sqlite3", self._queries, db) run_migrations(self._queries, db) + def __enter__(self, *args, **kwargs): + pass + + def __exit__(self, *args, **kwargs): + self.close() + + def close(self): + if self._db: + self._db.commit() + self._db.close() + self._db = None + def query(self, query, limit=None): with self._db as db: query = compile_query(query)