From 99590ae53426301f38b51b8e7cb5849100d0ced3 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Fri, 20 Aug 2021 01:12:50 -0600 Subject: [PATCH] Tap out test coverage of the jobq --- projects/jobq/src/python/jobq/__init__.py | 151 +++++++++++++--------- projects/jobq/test/python/test_jobq.py | 105 +++++++++++++++ 2 files changed, 196 insertions(+), 60 deletions(-) create mode 100644 projects/jobq/test/python/test_jobq.py diff --git a/projects/jobq/src/python/jobq/__init__.py b/projects/jobq/src/python/jobq/__init__.py index dab40be..f23fbb4 100644 --- a/projects/jobq/src/python/jobq/__init__.py +++ b/projects/jobq/src/python/jobq/__init__.py @@ -7,10 +7,13 @@ import os import sys import sqlite3 import json +from typing import NamedTuple, Optional as Maybe +from datetime import datetime import anosql from anosql_migrations import run_migrations, with_migrations + _GET_JOB_FIELDS = """\ `id` , `payload` @@ -19,14 +22,19 @@ _GET_JOB_FIELDS = """\ , `modified` """ +_GET_JOB_ORDER = """\ + `modified` ASC +, `rowid` ASC +""" + _SQL = f"""\ -- name: migration-0000-create-jobq CREATE TABLE `job` ( - `id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key -, `payload` TEXT -- JSON payload -, `events` TEXT DEFAULT '[]' -- append log of JSON events -, `state` TEXT -- JSON state of the job -, `modified` INTEGER DEFAULT CURRENT_TIMESTAMP -- last modified + `id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key +, `payload` TEXT -- JSON payload +, `events` TEXT DEFAULT '[]' -- append log of JSON events +, `state` TEXT -- JSON state of the job +, `modified` INTEGER -- last modified -- note the `rowid` field is defaulted ); -- name: migration-0001-index-modified @@ -39,16 +47,17 @@ INSERT INTO `job` ( `payload` , `state` , `events` +, `modified` ) VALUES ( :payload -, :state -, json_array(json_array('job_created', json_object('timestamp', CURRENT_TIMESTAMP))) +, json(:state) +, json_array(json_array('job_created', json_object('timestamp', strftime('%s', 'now')))) +, strftime('%s','now') ) RETURNING - `id` -, `state` +{_GET_JOB_FIELDS} ; --- name: job-get? +-- name: job-get SELECT {_GET_JOB_FIELDS} FROM `job` @@ -66,21 +75,14 @@ SELECT , `state` FROM `job` ORDER BY - `id` ASC +{_GET_JOB_ORDER} ; --- name: job-filter-state -SELECT - `id` -, `state` -FROM `job` -WHERE `state` = :state -; --- name: job-append-event! +-- name: job-append-event=", ">", "LIKE", "IS"] - return f"{compile_term(qexpr)} {op} {compile_term(val)}" + assert not any(keyword in query.lower() for keyword in ["select", "update", "delete", ";"]) + return " AND ".join(terms) - ops = [compile_op(op) for op in query] - return " AND ".join(ops) + +class Job(NamedTuple): + id: int + payload: object + events: object + state: object + modified: datetime class JobQueue(object): @@ -186,6 +190,25 @@ class JobQueue(object): def __exit__(self, *args, **kwargs): self.close() + def _from_tuple(self, result) -> Job: + assert isinstance(result, tuple) + id, payload, events, state, modified = result + return Job( + int(id), + json.loads(payload), + json.loads(events), + json.loads(state), + datetime.fromtimestamp(int(modified)) + ) + + def _from_result(self, result) -> Job: + assert isinstance(result, list) + assert len(result) == 1 + return self._from_tuple(result[0]) + + def _from_results(self, results): + return [self._from_tuple(t) for t in results] + def close(self): if self._db: self._db.commit() @@ -215,55 +238,63 @@ class JobQueue(object): break jobs = lf(jobs) - return list(jobs) + return self._from_results(jobs) - def create(self, job, new_state=None): + def create(self, job, new_state=None) -> Job: """Create a new job on the queue, optionally specifying its state.""" with self._db as db: - (id, state), = self._queries.job_create( - db, - payload=json.dumps(job), - state=json.dumps(new_state), + return self._from_result( + self._queries.job_create( + db, + payload=json.dumps(job), + state=json.dumps(new_state), + ) ) - return id - def poll(self, query, new_state): + def poll(self, query, new_state) -> Maybe[Job]: """Query for the longest-untouched job matching, advancing it to new_state.""" with self._db as db: cur = db.cursor() - cur.execute(_POLL_SQL.format(compile_query(query)), - {"state": json.dumps(new_state)}) + statement = _POLL_SQL.format(compile_query(query)) + print(statement) + cur.execute(statement, {"state": json.dumps(new_state)}) results = cur.fetchall() if results: - return results + return self._from_result(results) def get(self, job_id): """Fetch all available data about a given job by ID.""" with self._db as db: - return self._queries.job_get(db, id=job_id) + return self._from_result( + self._queries.job_get(db, id=job_id) + ) def cas_state(self, job_id, old_state, new_state): """CAS update a job's state, returning the updated job or indicating a conflict.""" with self._db as db: - return self._queries.job_cas_state( + result = self._queries.job_cas_state( db, id=job_id, old_state=json.dumps(old_state), new_state=json.dumps(new_state), ) + if result: + return self._from_result(result) def append_event(self, job_id, event): """Append a user-defined event to the job's log.""" with self._db as db: - return self._queries.job_append_event( - db, - id=job_id, - event=json.dumps(event) + return self._from_result( + self._queries.job_append_event( + db, + id=job_id, + event=json.dumps(event) + ) ) def delete_job(self, job_id): diff --git a/projects/jobq/test/python/test_jobq.py b/projects/jobq/test/python/test_jobq.py new file mode 100644 index 0000000..6026467 --- /dev/null +++ b/projects/jobq/test/python/test_jobq.py @@ -0,0 +1,105 @@ +""" +Tests covering the jobq API +""" + +import logging +from time import sleep + +from jobq import Job, JobQueue +import pytest + +logging.getLogger().setLevel(logging.DEBUG) + + +@pytest.fixture +def db(): + return JobQueue(":memory:") + + +@pytest.fixture +def payload(): + return "a basic payload" + + +def test_create(db, payload): + """Assert that create does the thing.""" + + j = db.create(payload) + + assert j + assert isinstance(j, Job) + assert j.id == 1 + assert j.payload == payload + + +def test_create_get(db, payload): + """Assert that get-after-create returns the same value.""" + + j = db.create(payload) + + assert j == db.get(j.id) + + +def test_poll(db): + """Test that we can poll a job, and the oldest wins.""" + + j1 = db.create("payload 1") + j2 = db.create("payload 2") + assert j1.modified == j2.modified, "Two within the second to force the `rowid` ASC" + sleep(1) # And a side-effect for the third one + j3 = db.create("payload 3") + + j = db.poll('true', ["assigned"]) + + assert isinstance(j, Job) + assert j.id == j1.id, "j1 is the oldest in the system and should poll first." + assert j.state == ["assigned"] + + +def test_poll_not_found(db): + """Test that poll can return nothing.""" + + j1 = db.create("payload 1") + j = db.poll('false', ["assigned"]) + assert j is None + + +def test_append(db, payload): + """Test that appending an event to the log does append and preserves invariants.""" + + j = db.create(payload) + sleep(1) ## side-effect so that sqlite3 gets a different commit timestamp + j_prime = db.append_event(j.id, "some user-defined event") + + assert isinstance(j_prime, Job) + assert j != j_prime + assert j_prime.id == j.id + assert j_prime.state == j.state + assert j_prime.modified > j.modified + assert j_prime.events != j.events + assert j_prime.events[:-1] == j.events + + +def test_cas_ok(db): + """Test that we can CAS a job from one state to the 'next'.""" + + j = db.create("job2", ["state", 2]) + sleep(1) # side-effect so that sqlite3 gets a different commit timestamp + j_prime = db.cas_state(j.id, ["state", 2], ["state", 3]) + + assert isinstance(j_prime, Job), "\n".join(db._db.iterdump()) + assert j != j_prime + assert j_prime.id == j.id + assert j_prime.state != j.state + assert j_prime.modified > j.modified + assert j_prime.events != j.events + assert j_prime.events[:-1] == j.events + + +def test_cas_fail(db): + """Test that if we have a 'stale' old state CAS fails.""" + + j = db.create("job2", ["state", 2]) + j_prime = db.cas_state(j.id, ["state", 1], ["state", 2]) + + assert j_prime is None