Split the jobq into a library and a daemon

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-19 23:28:33 -06:00
parent 2f59f91991
commit 49157000b7
10 changed files with 718 additions and 516 deletions

View file

@ -1,26 +1,13 @@
zapp_binary( py_project(
name = "jobq", name = "jobq",
main = "src/python/jobq/__main__.py", lib_deps = [
imports = [
"src/python",
],
deps = [
"//projects/anosql", "//projects/anosql",
"//projects/anosql-migrations", "//projects/anosql-migrations",
py_requirement("flask"),
py_requirement("pyyaml"),
] ]
) )
py_library( zapp_binary(
name = "client", name = "benchmark",
srcs = [ main = "benchmark.py",
"src/python/jobq/rest/api.py", deps = [":jobq"],
],
imports = [
"src/python",
],
deps = [
py_requirement("requests"),
],
) )

View file

@ -1,153 +1,3 @@
# Jobq # Jobq
Jobq is an event-oriented framework for recording _jobs_. Abusing sqlite3 as a job queue.
Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
Users may manually add _events_ to the log.
Note that, while we strongly suggest that _state_ should always be a tagged tuple, no constraints are placed on its contents.
## HTTP API
### GET /api/v0/job
Return an enumeration of jobs active in the system.
**Response** -
```shell
$ curl -X POST $JOBQ/api/v0/job | jq .
{"jobs": [
{"id": 1, "state": ["CREATED"]},
{"id": 2, "state": ["ASSIGNED"]},
{"id": 3, "state": ["SLEEPING"]},
{"id": 3, "state": ["FINISHED"]}
]}
```
### POST /api/v0/job
Perform a point-in-time query for jobs.
The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
Valid exprs are any SQLite expression not containing sub-queries.
Here, we're looking for jobs tagged as in the `["CREATED"]` state.
``` shell
$ curl -X POST $JOBQ/api/v0/job --data '{"query": [["IS", "json_extract(state, '$[0]')", "CREATED"]]}' | jq .
{"jobs": [
{"id": 1, "state": ["CREATED"]},
]}
```
### POST /api/v0/job/create
Given a JSON document as the POST body, create a new job with a payload in the given state.
If state is not provided, the state `null` is used.
```
$ curl -X POST $JOBQ/api/v0/job/create --data '{"state": ["CREATED"], "payload": {"msg": "Hello, world!"}}' | jq .
{
"id": 1
}
```
### POST /api/v0/job/poll
Poll for at most one job matching the given query, atomically advancing it to the given state.
Uses the same query format as the /job endpoint.
Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
``` shell
$ curl -X POST $JOBQ/api/v0/job/poll --data '{"query": [["IS", "j.state", null]], "state":["ASSIGNED", {"node": 1}]}' | jq .
{
"id": 3,
"state": [
"ASSIGNED",
{
"node": 1
}
]
}
```
### GET /api/v0/job/<job_id>
Return all available data about a given job, including the payload, event log and current state.
```shell
$ curl -X GET $JOBQ/api/v0/job/1 | jq .
{
"id": 1,
"payload": {
"msg": "Hello, world"
},
"state": [
"CREATED"
],
"events": [
[
"job_created",
{
"timestamp": "1628909303"
}
]
]
}
```
### POST /api/v0/job/<job_id>/state
POST the 'current' state, and a proposed new state, attempting to update the state of the job using CAS.
If the state of the job updates successfully, a new event will be appended to the job's log and the resulting job will be returned.
Otherwise a conflict will be signaled.
``` shell
$ curl -X POST $JOBQ/api/v0/job/1/state --data '{"new": ["ASSIGNED"], "old": ["CREATED"]}' | jq .
{
"id": 1,
"payload": {
"msg": "Hello, world"
},
"state": [
"ASSIGNED"
],
"events": [
[
"job_created",
{
"timestamp": "1628911153"
}
],
[
"job_state_advanced",
{
"new": [
"ASSIGNED"
],
"old": [
"CREATED"
],
"timestamp": "1628911184"
}
]
]
}
```
### POST /api/v0/job/<job_id>/event
Append an arbitrary event to the log.
User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.
``` shell
$ curl -X POST $JOBQ/api/v0/job/1/event --data '["my cool event"]' | jq .events[-1]
[
"user_event",
{
"event": [
"my cool event"
],
"timestamp": "1628911503"
}
]
```
### DELETE /api/v0/job/<job_id>
Expunge a given job from the system by ID.

View file

@ -0,0 +1,96 @@
"""
Benchmarking the jobq.
"""
from contextlib import contextmanager
from time import perf_counter_ns
from abc import abstractclassmethod
import os
from random import randint, choice
import string
from statistics import mean, median, stdev
import tempfile
from jobq import JobQueue
def randstr(len):
return ''.join(choice(string.ascii_uppercase + string.digits) for _ in range(len))
class Timing(object):
def __init__(self, start):
self.start = start
self.end = None
@property
def duration(self):
if self.end:
return self.end - self.start
@contextmanager
def timing():
"""A context manager that produces a semi-mutable timing record."""
obj = Timing(perf_counter_ns())
yield obj
obj.end = perf_counter_ns()
def bench(callable, reps):
timings = []
with timing() as run_t:
for _ in range(reps):
with timing() as t:
callable()
timings.append(t.duration)
print(f"""Ran {callable.__name__!r} {reps} times, total time {run_t.duration / 1e9} (s)
mean: {mean(timings) / 1e9} (s)
median: {median(timings) / 1e9} (s)
stddev: {stdev(timings) / 1e9} (s)
test overhead: {(run_t.duration - sum(timings)) / reps / 1e9} (s)
""")
def test_insert(q, reps):
# Measuring insertion time
jobs = [
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
for _ in range(reps)
]
jobs_i = iter(jobs)
def insert():
q.create(next(jobs_i), new_state=["CREATED"])
bench(insert, reps)
def test_poll(q, reps):
def poll():
q.poll([["=", "json_extract(j.state, '$[0]')", "'CREATED'"]], ["POLLED"])
bench(poll, reps)
if __name__ == "__main__":
# Test params
reps = 10000
path = "/tmp/jobq-bench.sqlite3"
# Ensuring a clean-ish run env.
if os.path.exists(path):
os.remove(path)
# And the tests
print(f"Testing with {path}")
q = JobQueue(path)
test_insert(q, reps)
test_poll(q, reps)
print(f"Testing with :memory:")
q = JobQueue(":memory:")
test_insert(q, reps)
test_poll(q, reps)

View file

@ -0,0 +1,261 @@
"""
A job queue library teetering atop sqlite3.
"""
import logging
import os
import sys
import sqlite3
import json
import anosql
from anosql_migrations import run_migrations, with_migrations
_GET_JOB_FIELDS = """\
`id`
, `payload`
, `events`
, `state`
, `modified`
"""
_SQL = f"""\
-- name: migration-0000-create-jobq
CREATE TABLE `job` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
, `payload` TEXT -- JSON payload
, `events` TEXT DEFAULT '[]' -- append log of JSON events
, `state` TEXT -- JSON state of the job
, `modified` INTEGER DEFAULT CURRENT_TIMESTAMP -- last modified
-- note the `rowid` field is defaulted
);
-- name: migration-0001-index-modified
-- Enable efficient queries ordered by job modification
CREATE INDEX IF NOT EXISTS `job_modified` ON `job` (
`modified`
);
-- name: job-create<!
INSERT INTO `job` (
`payload`
, `state`
, `events`
) VALUES (
:payload
, :state
, json_array(json_array('job_created', json_object('timestamp', CURRENT_TIMESTAMP)))
)
RETURNING
`id`
, `state`
;
-- name: job-get?
SELECT
{_GET_JOB_FIELDS}
FROM `job`
WHERE
`id` = :id
;
-- name: job-delete!
DELETE FROM `job`
WHERE
`id` = :id
;
-- name: job-list
SELECT
`id`
, `state`
FROM `job`
ORDER BY
`id` ASC
;
-- name: job-filter-state
SELECT
`id`
, `state`
FROM `job`
WHERE `state` = :state
;
-- name: job-append-event!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]', json_array('user_event', json_object('event', json(:event), 'timestamp', CURRENT_TIMESTAMP)))
, `modified` = CURRENT_TIMESTAMP
WHERE
`id` = :id
RETURNING
{_GET_JOB_FIELDS}
;
-- name: job-cas-state<!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(:old_state), 'new', json(:new_state), 'timestamp', CURRENT_TIMESTAMP)))
, `state` = json(:new_state)
, `modified` = CURRENT_TIMESTAMP
WHERE
`id` = :id
AND `state` = json(:old_state)
RETURNING
{_GET_JOB_FIELDS}
;
"""
# Anosql even as forked doesn't quite support inserting formatted sub-queries.
# It's not generally safe, etc. So we have to do it ourselves :/
# These two are broken out because they use computed `WHERE` clauses.
_QUERY_SQL = """\
SELECT
`id`
, `state`
FROM
`job` AS `j`
WHERE
({})
;
"""
_POLL_SQL = """\
UPDATE `job`
SET
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(state), 'new', json(:state), 'timestamp', CURRENT_TIMESTAMP)))
, `state` = json(:state)
, `modified` = CURRENT_TIMESTAMP
WHERE
`id` IN (
SELECT
`id`
FROM
`job` AS `j`
WHERE
({{}})
ORDER BY
`modified` ASC
LIMIT 1
)
RETURNING
{_GET_JOB_FIELDS}
;
"""
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
def compile_query(query):
"""Compile a query to a SELECT over jobs.
The query is a sequence of ordered pairs [op, path, val].
Supported ops are:
- `<`, `>`, `<=`, `>=`, `=`
- `LIKE`
Query ops join under `AND`
"""
def compile_term(term):
if term is None:
return "NULL"
else:
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
return term
def compile_op(op):
op, qexpr, val = op
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
ops = [compile_op(op) for op in query]
return " AND ".join(ops)
class JobQueue(object):
def __init__(self, path):
self._db = sqlite3.connect(path)
self._queries = anosql.from_str(_SQL, "sqlite3")
with self._db as db:
self._queries = with_migrations("sqlite3", self._queries, db)
run_migrations(self._queries, db)
def query(self, query, limit=None):
with self._db as db:
query = compile_query(query)
def qf():
cur = db.cursor()
cur.execute(_QUERY_SQL.format(query))
yield from cur.fetchall()
cur.close()
jobs = qf()
if limit:
limit = int(limit)
def lf(iterable):
iterable = iter(iterable)
for i in range(limit):
try:
yield next(iterable)
except StopIteration:
break
jobs = lf(jobs)
return list(jobs)
def create(self, job, new_state=None):
"""Create a new job on the queue, optionally specifying its state."""
with self._db as db:
(id, state), = self._queries.job_create(
db,
payload=json.dumps(job),
state=json.dumps(new_state),
)
return id
def poll(self, query, new_state):
"""Query for the longest-untouched job matching, advancing it to new_state."""
with self._db as db:
cur = db.cursor()
cur.execute(_POLL_SQL.format(compile_query(query)),
{"state": json.dumps(new_state)})
results = cur.fetchall()
if results:
return results
def get(self, job_id):
"""Fetch all available data about a given job by ID."""
with self._db as db:
return self._queries.job_get(db, id=job_id)
def cas_state(self, job_id, old_state, new_state):
"""CAS update a job's state, returning the updated job or indicating a conflict."""
with self._db as db:
return self._queries.job_cas_state(
db,
id=job_id,
old_state=json.dumps(old_state),
new_state=json.dumps(new_state),
)
def append_event(self, job_id, event):
"""Append a user-defined event to the job's log."""
with self._db as db:
return self._queries.job_append_event(
db,
id=job_id,
event=json.dumps(event)
)
def delete_job(self, job_id):
"""Delete a job by ID, regardless of state."""
with self._db as db:
return self._queries.job_delete(db, id=job_id)

View file

@ -1,346 +0,0 @@
"""
A mock job queue.
"""
import argparse
from functools import wraps
import json
import logging
import os
import sys
import sqlite3
import anosql
from anosql_migrations import run_migrations, with_migrations
from flask import abort, current_app, Flask, jsonify, request
_SQL = """\
-- name: migration-0000-create-jobq
CREATE TABLE `job` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
, `payload` TEXT -- JSON payload
, `events` TEXT DEFAULT '[]' -- append log of JSON events
, `state` TEXT -- JSON state of the job
);
-- name: job-create<!
INSERT INTO `job` (
`payload`
, `state`
, `events`
) VALUES (
:payload
, :state
, json_array(json_array('job_created',
json_object('timestamp', strftime('%s', 'now'))))
)
RETURNING
`id`
, `state`
;
-- name: job-get?
SELECT
`id`
, `payload`
, `events`
, `state`
FROM `job`
WHERE
`id` = :id
;
-- name: job-delete!
DELETE FROM `job`
WHERE
`id` = :id
;
-- name: job-list
SELECT
`id`
, `state`
FROM `job`
ORDER BY
`id` ASC
;
-- name: job-filter-state
SELECT
`id`
, `state`
FROM `job`
WHERE `state` = :state
;
-- name: job-append-event!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]',
json_array('user_event',
json_object('event', json(:event),
'timestamp', strftime('%s', 'now'))))
WHERE
`id` = :id
;
-- name: job-advance-state!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]',
json_array('job_state_advanced',
json_object('old', json(state),
'new', json(:state),
'timestamp', strftime('%s', 'now'))))
, `state` = json(:state)
WHERE
`id` = :id
;
-- name: job-cas-state<!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]',
json_array('job_state_advanced',
json_object('old', json(:old_state),
'new', json(:new_state),
'timestamp', strftime('%s', 'now'))))
, `state` = json(:new_state)
WHERE
`id` = :id
AND `state` = json(:old_state)
RETURNING
`id`
;
"""
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8080)
parser.add_argument("--host", default="localhost")
parser.add_argument("--db", default="~/jobq.sqlite3")
@app.before_first_request
def setup_queries():
# Load the queries
queries = anosql.from_str(_SQL, "sqlite3")
# Do migrations
with sqlite3.connect(current_app.config["db"]) as db:
queries = with_migrations("sqlite3", queries, db)
run_migrations(queries, db)
current_app.queries = queries
@app.before_request
def before_request():
request.db = sqlite3.connect(current_app.config["db"])
request.db.set_trace_callback(logging.getLogger("sqlite3").debug)
@app.teardown_request
def teardown_request(exception):
request.db.commit()
request.db.close()
def compile_query(query):
"""Compile a query to a SELECT over jobs.
The query is a sequence of ordered pairs [op, path, val].
Supported ops are:
- `<`, `>`, `<=`, `>=`, `=`
- `LIKE`
Query ops join under `AND`
"""
def compile_term(term):
if term is None:
return "NULL"
else:
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
return term
def compile_op(op):
op, qexpr, val = op
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
ops = [compile_op(op) for op in query]
return " AND ".join(ops)
@app.route("/api/v0/job", methods=["GET", "POST"])
def get_jobs():
"""Return jobs in the system."""
if request.method == "POST":
blob = request.get_json(force=True)
else:
blob = {}
if query := blob.get("query"):
query = compile_query(query)
print(query)
def qf():
cur = request.db.cursor()
cur.execute(f"SELECT `id`, `state` FROM `job` AS `j` WHERE {query};")
yield from cur.fetchall()
cur.close()
jobs = qf()
else:
jobs = current_app.queries.job_list(request.db)
if limit := blob.get("limit", 100):
limit = int(limit)
def lf(iterable):
iterable = iter(iterable)
for i in range(limit):
try:
yield next(iterable)
except StopIteration:
break
jobs = lf(jobs)
return jsonify({
"jobs": [
{
"id": id,
"state": json.loads(state) if state is not None else state
}
for id, state in jobs
]
}), 200
@app.route("/api/v0/job/create", methods=["POST"])
def create_job():
"""Create a job."""
blob = request.get_json(force=True)
payload = blob["payload"]
state = blob.get("state", None)
id, state = current_app.queries.job_create(
request.db,
payload=json.dumps(payload),
state=json.dumps(state),
)
return jsonify({"id": id, "state": state}), 200
@app.route("/api/v0/job/poll", methods=["POST"])
def poll_job():
"""Using a query, attempt to poll for the next job matching criteria."""
blob = request.get_json(force=True)
query = compile_query(blob["query"])
cur = request.db.cursor()
cur.execute(f"""\
UPDATE `job`
SET
`events` = json_insert(events, '$[#]',
json_array('job_state_advanced',
json_object('old', json(state),
'new', json(:state),
'timestamp', strftime('%s', 'now'))))
, `state` = json(:state)
WHERE
`id` IN (
SELECT
`id`
FROM
`job` AS `j`
WHERE
{query}
LIMIT 1
)
RETURNING
`id`
, `state`
;""", {"state": json.dumps(blob["state"])})
results = cur.fetchall()
cur.close()
if results:
(id, state), = results
return jsonify({"id": id, "state": json.loads(state)}), 200
else:
abort(404)
@app.route("/api/v0/job/<job_id>", methods=["GET"])
def get_job(job_id):
"""Return a job by ID."""
r = current_app.queries.job_get(request.db, id=job_id)
if not r:
abort(404)
# Unpack the response tuple
id, payload, events, state = r
return jsonify({
"id": id,
"payload": json.loads(payload),
"events": json.loads(events),
"state": json.loads(state) if state is not None else state,
}), 200
@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
def update_state(job_id):
"""CAS update a job's state, returning the updated job or indicating a conflict."""
document = request.get_json(force=True)
old = document["old"]
new = document["new"]
if current_app.queries.job_cas_state(
request.db,
id=job_id,
old_state=json.dumps(old),
new_state=json.dumps(new),
):
return get_job(job_id)
else:
abort(409)
@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
def append_event(job_id):
"""Append a user-defined event to the job's log."""
current_app.queries.job_append_event(
request.db,
id=job_id,
event=json.dumps(request.get_json(force=True))
)
return get_job(job_id)
@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
def delete_job(job_id):
"""Delete a given job."""
current_app.queries.job_delete(request.db, id=job_id)
return jsonify({}), 200
def main():
"""Run the mock server."""
opts, args = parser.parse_known_args()
app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
app.config["host"] = opts.host
app.config["port"] = opts.port
app.run(
threaded=True,
)
if __name__ == "__main__":
main()

24
projects/jobqd/BUILD Normal file
View file

@ -0,0 +1,24 @@
zapp_binary(
name = "jobqd",
main = "src/python/jobqd/__main__.py",
imports = [
"src/python",
],
deps = [
"//projects/jobq",
py_requirement("flask"),
]
)
py_library(
name = "client",
srcs = [
"src/python/jobq/rest/api.py",
],
imports = [
"src/python",
],
deps = [
py_requirement("requests"),
],
)

58
projects/jobqd/README.md Normal file
View file

@ -0,0 +1,58 @@
# Jobq
Jobq is an event-oriented framework for recording _jobs_.
Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
Users may manually add _events_ to the log.
Note that, while we strongly suggest that _state_ should always be a tagged tuple, no constraints are placed on its contents.
## HTTP API
### GET /api/v0/queue
Enumerate all the queues in the system.
### POST /api/v0/queue
Create a new queue of jobs.
### DELETE /api/v0/queue/<q_id>
Expunge a queue, deleting all jobs in it regardless of state.
### GET /api/v0/queue/<q_id>/job
Return an enumeration of jobs active in the system.
### POST /api/v0/queue/<q_id>/query_jobs
Perform a point-in-time query for jobs.
The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
Valid exprs are any SQLite expression not containing sub-queries.
Here, we're looking for jobs tagged as in the `["CREATED"]` state.
### POST /api/v0/queue/<q_id>/poll_job
Poll for at most one job matching the given query, atomically advancing it to the given state.
Uses the same query format as the /job endpoint.
Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
### POST /api/v0/queue/<q_id>/job
Given a JSON document as the POST body, create a new job with a payload in the given state.
If state is not provided, the state `null` is used.
### GET /api/v0/queue/<q_id>/job/<job_id>
Return all available data about a given job, including the payload, event log and current state.
### DELETE /api/v0/queue/<q_id>/job/<job_id>
Expunge a given job from the system by ID.
### POST /api/v0/queue/<q_id>/job/<job_id>/state
POST the 'current' state, and a proposed new state, attempting to update the state of the job using CAS.
If the state of the job updates successfully, a new event will be appended to the job's log and the resulting job will be returned.
Otherwise a conflict will be signaled.
### POST /api/v0/queue/<q_id>/job/<job_id>/event
Append an arbitrary event to the log.
User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.

View file

@ -0,0 +1,146 @@
"""
A mock job queue.
"""
import argparse
from functools import wraps
import json
import logging
import os
import sys
import sqlite3
from jobq import JobQueue
from flask import abort, current_app, Flask, jsonify, request
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8080)
parser.add_argument("--host", default="localhost")
parser.add_argument("--db", default="~/jobq.sqlite3")
@app.before_first_request
def setup_queries():
current_app.q = JobQueue(current_app.config["db"])
@app.route("/api/v0/job", methods=["GET", "POST"])
def get_jobs():
"""Return jobs in the system."""
if request.method == "POST":
blob = request.get_json(force=True)
else:
blob = {}
query = blob.get("query", [["true"]])
return jsonify({
"jobs": [
{
"id": id,
"state": json.loads(state) if state is not None else state
}
for id, state in current_app.q.query(query)
]
}), 200
@app.route("/api/v0/job/create", methods=["POST"])
def create_job():
"""Create a job."""
blob = request.get_json(force=True)
payload = blob["payload"]
state = blob.get("state", None)
id, state = current_app.q.create(
payload, state
)
return jsonify({"id": id, "state": state}), 200
@app.route("/api/v0/job/poll", methods=["POST"])
def poll_job():
"""Using a query, attempt to poll for the next job matching criteria."""
blob = request.get_json(force=True)
query = blob["query"]
state = blob["state"]
results = current_app.q.poll(query, state)
if results:
(id, state), = results
return jsonify({"id": id, "state": json.loads(state)}), 200
else:
abort(404)
@app.route("/api/v0/job/<job_id>", methods=["GET"])
def get_job(job_id):
"""Return a job by ID."""
r = current_app.q.get(id=job_id)
if not r:
abort(404)
# Unpack the response tuple
id, payload, events, state, modified = r
return jsonify({
"id": id,
"payload": json.loads(payload),
"events": json.loads(events),
"state": json.loads(state) if state is not None else state,
"modified": modified,
}), 200
@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
def update_state(job_id):
"""CAS update a job's state, returning the updated job or indicating a conflict."""
document = request.get_json(force=True)
old = document["old"]
new = document["new"]
if current_app.q.cas_state(job_id, old, new):
return get_job(job_id)
else:
abort(409)
@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
def append_event(job_id):
"""Append a user-defined event to the job's log."""
return current_app.q.append_event(job_id, event=request.get_json(force=True))
@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
def delete_job(job_id):
"""Delete a given job."""
current_app.queries.job_delete(request.db, id=job_id)
return jsonify({}), 200
def main():
"""Run the mock server."""
opts, args = parser.parse_known_args()
app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
app.config["host"] = opts.host
app.config["port"] = opts.port
app.run(
threaded=True,
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,126 @@
---
openapi: 3.0.1
info:
title: Jobq
description: A trivial job queue API
tags:
- name: queue
- name: job
definitions:
parameters:
q_id:
in: path
name: q_id
required: true
description: The ID of a given queue
j_id:
in: path
name: j_id
required: true
description: The ID of a given job
responses:
id: {}
queue: {}
queues: {}
job: {}
jobs:
"200":
description: A list of jobs
content:
application/json:
schema:
$ref: "#/definitions/types/jobs"
types:
queue: {}
queues:
type: object
properties:
jobs:
type: list
items:
$ref: "#/definitions/types/queue"
job: {}
dehydrated_job: {}
jobs:
type: object
properties:
jobs:
type: list
items:
$ref: "#/definitions/types/dehydrated_job"
paths:
"/api/v0/queue":
get:
description: Fetch a list of queues in the system
responses:
post:
description: Create a new job queue
responses:
"200":
description: Created
"409":
description: Conflict
"/api/v0/queue/{q_id}":
get:
description: ""
parameters:
- $ref: "#/definitions/parameters/q_id"
responses:
$ref: "#/definitions/responses/jobs"
"/api/v0/queue/{q_id}/job":
post:
description: "Create a job within a given queue."
parameters:
- $ref: "#/definitions/parameters/q_id"
"/api/v0/queue/{q_id}/query_jobs":
post:
description: "Query the jobs in a queue."
parameters:
- $ref: "#/definitions/parameters/q_id"
responses:
"200": {}
"/api/v0/queue/{q_id}/poll_job":
post:
description: "Poll zero or one jobs off the queue."
parameters:
- $ref: "#/definitions/parameters/q_id"
"/api/v0/job/{j_id}":
get:
description: "Return all available data about the job"
parameters:
- $ref: "#/definitions/parameters/j_id"
delete:
description: "Expunge the job"
parameters:
- $ref: "#/definitions/parameters/j_id"
"/api/v0/job/{j_id}/state":
post:
description: "Alter the job's state, appending an event"
parameters:
- $ref: "#/definitions/parameters/j_id"
"/api/v0/job/{j_id}/event":
post:
description: "Append an event to a given job without modifying state"
parameters:
- $ref: "#/definitions/parameters/j_d"