Make the jobq closeable and document it.

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-19 23:54:08 -06:00
parent 308e2d0209
commit 8e800a0507
2 changed files with 43 additions and 0 deletions

View file

@ -2,6 +2,37 @@
Abusing sqlite3 as a job queue. Abusing sqlite3 as a job queue.
## API
**Note** - this API currently does not support multiplexing queues within a single state store.
As files are ultimately the performance bottleneck, using multiple files PROBABLY serves you better anyway.
But investigation here is needed.
### jobq.JobQueue(path)
Construct and return a fully migrated and connected job queue.
The queue is closable, and usable as a context manager.
### jobq.JobQueue.create(payload, new_state=None)
Create a job with the given payload and optional state.
### jobq.JobQueue.get(job_id)
Read a job back by ID from the queue.
### jobq.JobQueue.poll(query, new_state)
Poll the queue for a single job matching the given query, atomically advancing it to the new state and returning it as if from `get()`.
Note that poll selects the OLDEST MATCHING JOB FIRST, thus providing a global round-robin scheduler on jobs, optimizing for progress not throughput.
### jobq.JobQueue.cas_state(job_id, old_state, new_state)
Atomically update the state of a single job from an old state to a new state.
Note that this operation NEED NOT SUCCEED, as the job MAY be concurrently modified.
Job queue algorithms should either be lock-free or use state to implement markers/locks with timeout based recovery.
### jobq.JobQueue.append_event(job_id, event)
Append a user-defined event to the given job's event log.
### jobq.JobQueue.delete(job_id)
Purge a given job by ID from the system.
## Benchmarks ## Benchmarks
Benchmarks are extremely steady. Benchmarks are extremely steady.

View file

@ -180,6 +180,18 @@ class JobQueue(object):
self._queries = with_migrations("sqlite3", self._queries, db) self._queries = with_migrations("sqlite3", self._queries, db)
run_migrations(self._queries, db) run_migrations(self._queries, db)
def __enter__(self, *args, **kwargs):
pass
def __exit__(self, *args, **kwargs):
self.close()
def close(self):
if self._db:
self._db.commit()
self._db.close()
self._db = None
def query(self, query, limit=None): def query(self, query, limit=None):
with self._db as db: with self._db as db:
query = compile_query(query) query = compile_query(query)