Tap out test coverage of the jobq
This commit is contained in:
parent
a3a800ab07
commit
99590ae534
2 changed files with 196 additions and 60 deletions
|
@ -7,10 +7,13 @@ import os
|
|||
import sys
|
||||
import sqlite3
|
||||
import json
|
||||
from typing import NamedTuple, Optional as Maybe
|
||||
from datetime import datetime
|
||||
|
||||
import anosql
|
||||
from anosql_migrations import run_migrations, with_migrations
|
||||
|
||||
|
||||
_GET_JOB_FIELDS = """\
|
||||
`id`
|
||||
, `payload`
|
||||
|
@ -19,14 +22,19 @@ _GET_JOB_FIELDS = """\
|
|||
, `modified`
|
||||
"""
|
||||
|
||||
_GET_JOB_ORDER = """\
|
||||
`modified` ASC
|
||||
, `rowid` ASC
|
||||
"""
|
||||
|
||||
_SQL = f"""\
|
||||
-- name: migration-0000-create-jobq
|
||||
CREATE TABLE `job` (
|
||||
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
|
||||
, `payload` TEXT -- JSON payload
|
||||
, `events` TEXT DEFAULT '[]' -- append log of JSON events
|
||||
, `state` TEXT -- JSON state of the job
|
||||
, `modified` INTEGER DEFAULT CURRENT_TIMESTAMP -- last modified
|
||||
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
|
||||
, `payload` TEXT -- JSON payload
|
||||
, `events` TEXT DEFAULT '[]' -- append log of JSON events
|
||||
, `state` TEXT -- JSON state of the job
|
||||
, `modified` INTEGER -- last modified
|
||||
-- note the `rowid` field is defaulted
|
||||
);
|
||||
-- name: migration-0001-index-modified
|
||||
|
@ -39,16 +47,17 @@ INSERT INTO `job` (
|
|||
`payload`
|
||||
, `state`
|
||||
, `events`
|
||||
, `modified`
|
||||
) VALUES (
|
||||
:payload
|
||||
, :state
|
||||
, json_array(json_array('job_created', json_object('timestamp', CURRENT_TIMESTAMP)))
|
||||
, json(:state)
|
||||
, json_array(json_array('job_created', json_object('timestamp', strftime('%s', 'now'))))
|
||||
, strftime('%s','now')
|
||||
)
|
||||
RETURNING
|
||||
`id`
|
||||
, `state`
|
||||
{_GET_JOB_FIELDS}
|
||||
;
|
||||
-- name: job-get?
|
||||
-- name: job-get
|
||||
SELECT
|
||||
{_GET_JOB_FIELDS}
|
||||
FROM `job`
|
||||
|
@ -66,21 +75,14 @@ SELECT
|
|||
, `state`
|
||||
FROM `job`
|
||||
ORDER BY
|
||||
`id` ASC
|
||||
{_GET_JOB_ORDER}
|
||||
;
|
||||
-- name: job-filter-state
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
WHERE `state` = :state
|
||||
;
|
||||
-- name: job-append-event!
|
||||
-- name: job-append-event<!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]', json_array('user_event', json_object('event', json(:event), 'timestamp', CURRENT_TIMESTAMP)))
|
||||
, `modified` = CURRENT_TIMESTAMP
|
||||
`events` = json_insert(events, '$[#]', json_array('user_event', json_object('event', json(:event), 'timestamp', strftime('%s', 'now'))))
|
||||
, `modified` = strftime('%s', 'now')
|
||||
WHERE
|
||||
`id` = :id
|
||||
RETURNING
|
||||
|
@ -90,9 +92,9 @@ RETURNING
|
|||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(:old_state), 'new', json(:new_state), 'timestamp', CURRENT_TIMESTAMP)))
|
||||
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(:old_state), 'new', json(:new_state), 'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:new_state)
|
||||
, `modified` = CURRENT_TIMESTAMP
|
||||
, `modified` = strftime('%s', 'now')
|
||||
WHERE
|
||||
`id` = :id
|
||||
AND `state` = json(:old_state)
|
||||
|
@ -105,23 +107,24 @@ RETURNING
|
|||
# It's not generally safe, etc. So we have to do it ourselves :/
|
||||
# These two are broken out because they use computed `WHERE` clauses.
|
||||
|
||||
_QUERY_SQL = """\
|
||||
_QUERY_SQL = f"""\
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
{_GET_JOB_FIELDS}
|
||||
FROM
|
||||
`job` AS `j`
|
||||
WHERE
|
||||
({})
|
||||
{{}}
|
||||
ORDER BY
|
||||
{_GET_JOB_ORDER}
|
||||
;
|
||||
"""
|
||||
|
||||
_POLL_SQL = f"""\
|
||||
UPDATE `job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(state), 'new', json(:state), 'timestamp', CURRENT_TIMESTAMP)))
|
||||
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(state), 'new', json(:state), 'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:state)
|
||||
, `modified` = CURRENT_TIMESTAMP
|
||||
, `modified` = strftime('%s', 'now')
|
||||
WHERE
|
||||
`id` IN (
|
||||
SELECT
|
||||
|
@ -129,9 +132,9 @@ SELECT
|
|||
FROM
|
||||
`job` AS `j`
|
||||
WHERE
|
||||
({{}})
|
||||
{{}}
|
||||
ORDER BY
|
||||
`modified` ASC
|
||||
{_GET_JOB_ORDER}
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
|
@ -154,20 +157,21 @@ def compile_query(query):
|
|||
Query ops join under `AND`
|
||||
"""
|
||||
|
||||
def compile_term(term):
|
||||
if term is None:
|
||||
return "NULL"
|
||||
else:
|
||||
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
|
||||
return term
|
||||
if isinstance(query, list):
|
||||
terms = query
|
||||
elif isinstance(query, str):
|
||||
terms = [query]
|
||||
|
||||
def compile_op(op):
|
||||
op, qexpr, val = op
|
||||
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
|
||||
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
|
||||
assert not any(keyword in query.lower() for keyword in ["select", "update", "delete", ";"])
|
||||
return " AND ".join(terms)
|
||||
|
||||
ops = [compile_op(op) for op in query]
|
||||
return " AND ".join(ops)
|
||||
|
||||
class Job(NamedTuple):
|
||||
id: int
|
||||
payload: object
|
||||
events: object
|
||||
state: object
|
||||
modified: datetime
|
||||
|
||||
|
||||
class JobQueue(object):
|
||||
|
@ -186,6 +190,25 @@ class JobQueue(object):
|
|||
def __exit__(self, *args, **kwargs):
|
||||
self.close()
|
||||
|
||||
def _from_tuple(self, result) -> Job:
|
||||
assert isinstance(result, tuple)
|
||||
id, payload, events, state, modified = result
|
||||
return Job(
|
||||
int(id),
|
||||
json.loads(payload),
|
||||
json.loads(events),
|
||||
json.loads(state),
|
||||
datetime.fromtimestamp(int(modified))
|
||||
)
|
||||
|
||||
def _from_result(self, result) -> Job:
|
||||
assert isinstance(result, list)
|
||||
assert len(result) == 1
|
||||
return self._from_tuple(result[0])
|
||||
|
||||
def _from_results(self, results):
|
||||
return [self._from_tuple(t) for t in results]
|
||||
|
||||
def close(self):
|
||||
if self._db:
|
||||
self._db.commit()
|
||||
|
@ -215,55 +238,63 @@ class JobQueue(object):
|
|||
break
|
||||
jobs = lf(jobs)
|
||||
|
||||
return list(jobs)
|
||||
return self._from_results(jobs)
|
||||
|
||||
def create(self, job, new_state=None):
|
||||
def create(self, job, new_state=None) -> Job:
|
||||
"""Create a new job on the queue, optionally specifying its state."""
|
||||
|
||||
with self._db as db:
|
||||
(id, state), = self._queries.job_create(
|
||||
db,
|
||||
payload=json.dumps(job),
|
||||
state=json.dumps(new_state),
|
||||
return self._from_result(
|
||||
self._queries.job_create(
|
||||
db,
|
||||
payload=json.dumps(job),
|
||||
state=json.dumps(new_state),
|
||||
)
|
||||
)
|
||||
return id
|
||||
|
||||
def poll(self, query, new_state):
|
||||
def poll(self, query, new_state) -> Maybe[Job]:
|
||||
"""Query for the longest-untouched job matching, advancing it to new_state."""
|
||||
|
||||
with self._db as db:
|
||||
cur = db.cursor()
|
||||
cur.execute(_POLL_SQL.format(compile_query(query)),
|
||||
{"state": json.dumps(new_state)})
|
||||
statement = _POLL_SQL.format(compile_query(query))
|
||||
print(statement)
|
||||
cur.execute(statement, {"state": json.dumps(new_state)})
|
||||
results = cur.fetchall()
|
||||
if results:
|
||||
return results
|
||||
return self._from_result(results)
|
||||
|
||||
def get(self, job_id):
|
||||
"""Fetch all available data about a given job by ID."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_get(db, id=job_id)
|
||||
return self._from_result(
|
||||
self._queries.job_get(db, id=job_id)
|
||||
)
|
||||
|
||||
def cas_state(self, job_id, old_state, new_state):
|
||||
"""CAS update a job's state, returning the updated job or indicating a conflict."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_cas_state(
|
||||
result = self._queries.job_cas_state(
|
||||
db,
|
||||
id=job_id,
|
||||
old_state=json.dumps(old_state),
|
||||
new_state=json.dumps(new_state),
|
||||
)
|
||||
if result:
|
||||
return self._from_result(result)
|
||||
|
||||
def append_event(self, job_id, event):
|
||||
"""Append a user-defined event to the job's log."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_append_event(
|
||||
db,
|
||||
id=job_id,
|
||||
event=json.dumps(event)
|
||||
return self._from_result(
|
||||
self._queries.job_append_event(
|
||||
db,
|
||||
id=job_id,
|
||||
event=json.dumps(event)
|
||||
)
|
||||
)
|
||||
|
||||
def delete_job(self, job_id):
|
||||
|
|
105
projects/jobq/test/python/test_jobq.py
Normal file
105
projects/jobq/test/python/test_jobq.py
Normal file
|
@ -0,0 +1,105 @@
|
|||
"""
|
||||
Tests covering the jobq API
|
||||
"""
|
||||
|
||||
import logging
|
||||
from time import sleep
|
||||
|
||||
from jobq import Job, JobQueue
|
||||
import pytest
|
||||
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db():
|
||||
return JobQueue(":memory:")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def payload():
|
||||
return "a basic payload"
|
||||
|
||||
|
||||
def test_create(db, payload):
|
||||
"""Assert that create does the thing."""
|
||||
|
||||
j = db.create(payload)
|
||||
|
||||
assert j
|
||||
assert isinstance(j, Job)
|
||||
assert j.id == 1
|
||||
assert j.payload == payload
|
||||
|
||||
|
||||
def test_create_get(db, payload):
|
||||
"""Assert that get-after-create returns the same value."""
|
||||
|
||||
j = db.create(payload)
|
||||
|
||||
assert j == db.get(j.id)
|
||||
|
||||
|
||||
def test_poll(db):
|
||||
"""Test that we can poll a job, and the oldest wins."""
|
||||
|
||||
j1 = db.create("payload 1")
|
||||
j2 = db.create("payload 2")
|
||||
assert j1.modified == j2.modified, "Two within the second to force the `rowid` ASC"
|
||||
sleep(1) # And a side-effect for the third one
|
||||
j3 = db.create("payload 3")
|
||||
|
||||
j = db.poll('true', ["assigned"])
|
||||
|
||||
assert isinstance(j, Job)
|
||||
assert j.id == j1.id, "j1 is the oldest in the system and should poll first."
|
||||
assert j.state == ["assigned"]
|
||||
|
||||
|
||||
def test_poll_not_found(db):
|
||||
"""Test that poll can return nothing."""
|
||||
|
||||
j1 = db.create("payload 1")
|
||||
j = db.poll('false', ["assigned"])
|
||||
assert j is None
|
||||
|
||||
|
||||
def test_append(db, payload):
|
||||
"""Test that appending an event to the log does append and preserves invariants."""
|
||||
|
||||
j = db.create(payload)
|
||||
sleep(1) ## side-effect so that sqlite3 gets a different commit timestamp
|
||||
j_prime = db.append_event(j.id, "some user-defined event")
|
||||
|
||||
assert isinstance(j_prime, Job)
|
||||
assert j != j_prime
|
||||
assert j_prime.id == j.id
|
||||
assert j_prime.state == j.state
|
||||
assert j_prime.modified > j.modified
|
||||
assert j_prime.events != j.events
|
||||
assert j_prime.events[:-1] == j.events
|
||||
|
||||
|
||||
def test_cas_ok(db):
|
||||
"""Test that we can CAS a job from one state to the 'next'."""
|
||||
|
||||
j = db.create("job2", ["state", 2])
|
||||
sleep(1) # side-effect so that sqlite3 gets a different commit timestamp
|
||||
j_prime = db.cas_state(j.id, ["state", 2], ["state", 3])
|
||||
|
||||
assert isinstance(j_prime, Job), "\n".join(db._db.iterdump())
|
||||
assert j != j_prime
|
||||
assert j_prime.id == j.id
|
||||
assert j_prime.state != j.state
|
||||
assert j_prime.modified > j.modified
|
||||
assert j_prime.events != j.events
|
||||
assert j_prime.events[:-1] == j.events
|
||||
|
||||
|
||||
def test_cas_fail(db):
|
||||
"""Test that if we have a 'stale' old state CAS fails."""
|
||||
|
||||
j = db.create("job2", ["state", 2])
|
||||
j_prime = db.cas_state(j.id, ["state", 1], ["state", 2])
|
||||
|
||||
assert j_prime is None
|
Loading…
Reference in a new issue