Tap out test coverage of the jobq

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-20 01:12:50 -06:00
parent 8e800a0507
commit a57ebeb524
2 changed files with 196 additions and 60 deletions

View file

@ -7,10 +7,13 @@ import os
import sys import sys
import sqlite3 import sqlite3
import json import json
from typing import NamedTuple, Optional as Maybe
from datetime import datetime
import anosql import anosql
from anosql_migrations import run_migrations, with_migrations from anosql_migrations import run_migrations, with_migrations
_GET_JOB_FIELDS = """\ _GET_JOB_FIELDS = """\
`id` `id`
, `payload` , `payload`
@ -19,6 +22,11 @@ _GET_JOB_FIELDS = """\
, `modified` , `modified`
""" """
_GET_JOB_ORDER = """\
`modified` ASC
, `rowid` ASC
"""
_SQL = f"""\ _SQL = f"""\
-- name: migration-0000-create-jobq -- name: migration-0000-create-jobq
CREATE TABLE `job` ( CREATE TABLE `job` (
@ -26,7 +34,7 @@ CREATE TABLE `job` (
, `payload` TEXT -- JSON payload , `payload` TEXT -- JSON payload
, `events` TEXT DEFAULT '[]' -- append log of JSON events , `events` TEXT DEFAULT '[]' -- append log of JSON events
, `state` TEXT -- JSON state of the job , `state` TEXT -- JSON state of the job
, `modified` INTEGER DEFAULT CURRENT_TIMESTAMP -- last modified , `modified` INTEGER -- last modified
-- note the `rowid` field is defaulted -- note the `rowid` field is defaulted
); );
-- name: migration-0001-index-modified -- name: migration-0001-index-modified
@ -39,16 +47,17 @@ INSERT INTO `job` (
`payload` `payload`
, `state` , `state`
, `events` , `events`
, `modified`
) VALUES ( ) VALUES (
:payload :payload
, :state , json(:state)
, json_array(json_array('job_created', json_object('timestamp', CURRENT_TIMESTAMP))) , json_array(json_array('job_created', json_object('timestamp', strftime('%s', 'now'))))
, strftime('%s','now')
) )
RETURNING RETURNING
`id` {_GET_JOB_FIELDS}
, `state`
; ;
-- name: job-get? -- name: job-get
SELECT SELECT
{_GET_JOB_FIELDS} {_GET_JOB_FIELDS}
FROM `job` FROM `job`
@ -66,21 +75,14 @@ SELECT
, `state` , `state`
FROM `job` FROM `job`
ORDER BY ORDER BY
`id` ASC {_GET_JOB_ORDER}
; ;
-- name: job-filter-state -- name: job-append-event<!
SELECT
`id`
, `state`
FROM `job`
WHERE `state` = :state
;
-- name: job-append-event!
UPDATE UPDATE
`job` `job`
SET SET
`events` = json_insert(events, '$[#]', json_array('user_event', json_object('event', json(:event), 'timestamp', CURRENT_TIMESTAMP))) `events` = json_insert(events, '$[#]', json_array('user_event', json_object('event', json(:event), 'timestamp', strftime('%s', 'now'))))
, `modified` = CURRENT_TIMESTAMP , `modified` = strftime('%s', 'now')
WHERE WHERE
`id` = :id `id` = :id
RETURNING RETURNING
@ -90,9 +92,9 @@ RETURNING
UPDATE UPDATE
`job` `job`
SET SET
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(:old_state), 'new', json(:new_state), 'timestamp', CURRENT_TIMESTAMP))) `events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(:old_state), 'new', json(:new_state), 'timestamp', strftime('%s', 'now'))))
, `state` = json(:new_state) , `state` = json(:new_state)
, `modified` = CURRENT_TIMESTAMP , `modified` = strftime('%s', 'now')
WHERE WHERE
`id` = :id `id` = :id
AND `state` = json(:old_state) AND `state` = json(:old_state)
@ -105,23 +107,24 @@ RETURNING
# It's not generally safe, etc. So we have to do it ourselves :/ # It's not generally safe, etc. So we have to do it ourselves :/
# These two are broken out because they use computed `WHERE` clauses. # These two are broken out because they use computed `WHERE` clauses.
_QUERY_SQL = """\ _QUERY_SQL = f"""\
SELECT SELECT
`id` {_GET_JOB_FIELDS}
, `state`
FROM FROM
`job` AS `j` `job` AS `j`
WHERE WHERE
({}) {{}}
ORDER BY
{_GET_JOB_ORDER}
; ;
""" """
_POLL_SQL = f"""\ _POLL_SQL = f"""\
UPDATE `job` UPDATE `job`
SET SET
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(state), 'new', json(:state), 'timestamp', CURRENT_TIMESTAMP))) `events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(state), 'new', json(:state), 'timestamp', strftime('%s', 'now'))))
, `state` = json(:state) , `state` = json(:state)
, `modified` = CURRENT_TIMESTAMP , `modified` = strftime('%s', 'now')
WHERE WHERE
`id` IN ( `id` IN (
SELECT SELECT
@ -129,9 +132,9 @@ SELECT
FROM FROM
`job` AS `j` `job` AS `j`
WHERE WHERE
({{}}) {{}}
ORDER BY ORDER BY
`modified` ASC {_GET_JOB_ORDER}
LIMIT 1 LIMIT 1
) )
RETURNING RETURNING
@ -154,20 +157,21 @@ def compile_query(query):
Query ops join under `AND` Query ops join under `AND`
""" """
def compile_term(term): if isinstance(query, list):
if term is None: terms = query
return "NULL" elif isinstance(query, str):
else: terms = [query]
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
return term
def compile_op(op): assert not any(keyword in query.lower() for keyword in ["select", "update", "delete", ";"])
op, qexpr, val = op return " AND ".join(terms)
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
ops = [compile_op(op) for op in query]
return " AND ".join(ops) class Job(NamedTuple):
id: int
payload: object
events: object
state: object
modified: datetime
class JobQueue(object): class JobQueue(object):
@ -186,6 +190,25 @@ class JobQueue(object):
def __exit__(self, *args, **kwargs): def __exit__(self, *args, **kwargs):
self.close() self.close()
def _from_tuple(self, result) -> Job:
assert isinstance(result, tuple)
id, payload, events, state, modified = result
return Job(
int(id),
json.loads(payload),
json.loads(events),
json.loads(state),
datetime.fromtimestamp(int(modified))
)
def _from_result(self, result) -> Job:
assert isinstance(result, list)
assert len(result) == 1
return self._from_tuple(result[0])
def _from_results(self, results):
return [self._from_tuple(t) for t in results]
def close(self): def close(self):
if self._db: if self._db:
self._db.commit() self._db.commit()
@ -215,56 +238,64 @@ class JobQueue(object):
break break
jobs = lf(jobs) jobs = lf(jobs)
return list(jobs) return self._from_results(jobs)
def create(self, job, new_state=None): def create(self, job, new_state=None) -> Job:
"""Create a new job on the queue, optionally specifying its state.""" """Create a new job on the queue, optionally specifying its state."""
with self._db as db: with self._db as db:
(id, state), = self._queries.job_create( return self._from_result(
self._queries.job_create(
db, db,
payload=json.dumps(job), payload=json.dumps(job),
state=json.dumps(new_state), state=json.dumps(new_state),
) )
return id )
def poll(self, query, new_state): def poll(self, query, new_state) -> Maybe[Job]:
"""Query for the longest-untouched job matching, advancing it to new_state.""" """Query for the longest-untouched job matching, advancing it to new_state."""
with self._db as db: with self._db as db:
cur = db.cursor() cur = db.cursor()
cur.execute(_POLL_SQL.format(compile_query(query)), statement = _POLL_SQL.format(compile_query(query))
{"state": json.dumps(new_state)}) print(statement)
cur.execute(statement, {"state": json.dumps(new_state)})
results = cur.fetchall() results = cur.fetchall()
if results: if results:
return results return self._from_result(results)
def get(self, job_id): def get(self, job_id):
"""Fetch all available data about a given job by ID.""" """Fetch all available data about a given job by ID."""
with self._db as db: with self._db as db:
return self._queries.job_get(db, id=job_id) return self._from_result(
self._queries.job_get(db, id=job_id)
)
def cas_state(self, job_id, old_state, new_state): def cas_state(self, job_id, old_state, new_state):
"""CAS update a job's state, returning the updated job or indicating a conflict.""" """CAS update a job's state, returning the updated job or indicating a conflict."""
with self._db as db: with self._db as db:
return self._queries.job_cas_state( result = self._queries.job_cas_state(
db, db,
id=job_id, id=job_id,
old_state=json.dumps(old_state), old_state=json.dumps(old_state),
new_state=json.dumps(new_state), new_state=json.dumps(new_state),
) )
if result:
return self._from_result(result)
def append_event(self, job_id, event): def append_event(self, job_id, event):
"""Append a user-defined event to the job's log.""" """Append a user-defined event to the job's log."""
with self._db as db: with self._db as db:
return self._queries.job_append_event( return self._from_result(
self._queries.job_append_event(
db, db,
id=job_id, id=job_id,
event=json.dumps(event) event=json.dumps(event)
) )
)
def delete_job(self, job_id): def delete_job(self, job_id):
"""Delete a job by ID, regardless of state.""" """Delete a job by ID, regardless of state."""

View file

@ -0,0 +1,105 @@
"""
Tests covering the jobq API
"""
import logging
from time import sleep
from jobq import Job, JobQueue
import pytest
logging.getLogger().setLevel(logging.DEBUG)
@pytest.fixture
def db():
return JobQueue(":memory:")
@pytest.fixture
def payload():
return "a basic payload"
def test_create(db, payload):
"""Assert that create does the thing."""
j = db.create(payload)
assert j
assert isinstance(j, Job)
assert j.id == 1
assert j.payload == payload
def test_create_get(db, payload):
"""Assert that get-after-create returns the same value."""
j = db.create(payload)
assert j == db.get(j.id)
def test_poll(db):
"""Test that we can poll a job, and the oldest wins."""
j1 = db.create("payload 1")
j2 = db.create("payload 2")
assert j1.modified == j2.modified, "Two within the second to force the `rowid` ASC"
sleep(1) # And a side-effect for the third one
j3 = db.create("payload 3")
j = db.poll('true', ["assigned"])
assert isinstance(j, Job)
assert j.id == j1.id, "j1 is the oldest in the system and should poll first."
assert j.state == ["assigned"]
def test_poll_not_found(db):
"""Test that poll can return nothing."""
j1 = db.create("payload 1")
j = db.poll('false', ["assigned"])
assert j is None
def test_append(db, payload):
"""Test that appending an event to the log does append and preserves invariants."""
j = db.create(payload)
sleep(1) ## side-effect so that sqlite3 gets a different commit timestamp
j_prime = db.append_event(j.id, "some user-defined event")
assert isinstance(j_prime, Job)
assert j != j_prime
assert j_prime.id == j.id
assert j_prime.state == j.state
assert j_prime.modified > j.modified
assert j_prime.events != j.events
assert j_prime.events[:-1] == j.events
def test_cas_ok(db):
"""Test that we can CAS a job from one state to the 'next'."""
j = db.create("job2", ["state", 2])
sleep(1) # side-effect so that sqlite3 gets a different commit timestamp
j_prime = db.cas_state(j.id, ["state", 2], ["state", 3])
assert isinstance(j_prime, Job), "\n".join(db._db.iterdump())
assert j != j_prime
assert j_prime.id == j.id
assert j_prime.state != j.state
assert j_prime.modified > j.modified
assert j_prime.events != j.events
assert j_prime.events[:-1] == j.events
def test_cas_fail(db):
"""Test that if we have a 'stale' old state CAS fails."""
j = db.create("job2", ["state", 2])
j_prime = db.cas_state(j.id, ["state", 1], ["state", 2])
assert j_prime is None