Split the jobq into a library and a daemon
This commit is contained in:
parent
8c98338782
commit
fe0fd3fdb1
10 changed files with 718 additions and 516 deletions
|
@ -1,26 +1,13 @@
|
|||
zapp_binary(
|
||||
py_project(
|
||||
name = "jobq",
|
||||
main = "src/python/jobq/__main__.py",
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
deps = [
|
||||
lib_deps = [
|
||||
"//projects/anosql",
|
||||
"//projects/anosql-migrations",
|
||||
py_requirement("flask"),
|
||||
py_requirement("pyyaml"),
|
||||
]
|
||||
)
|
||||
|
||||
py_library(
|
||||
name = "client",
|
||||
srcs = [
|
||||
"src/python/jobq/rest/api.py",
|
||||
],
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
deps = [
|
||||
py_requirement("requests"),
|
||||
],
|
||||
zapp_binary(
|
||||
name = "benchmark",
|
||||
main = "benchmark.py",
|
||||
deps = [":jobq"],
|
||||
)
|
||||
|
|
|
@ -1,153 +1,3 @@
|
|||
# Jobq
|
||||
|
||||
Jobq is an event-oriented framework for recording _jobs_.
|
||||
Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
|
||||
Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
|
||||
Users may manually add _events_ to the log.
|
||||
|
||||
Note that, while we strongly suggest that _state_ should always be a tagged tuple, no constraints are placed on its contents.
|
||||
|
||||
## HTTP API
|
||||
|
||||
### GET /api/v0/job
|
||||
Return an enumeration of jobs active in the system.
|
||||
|
||||
**Response** -
|
||||
```shell
|
||||
$ curl -X POST $JOBQ/api/v0/job | jq .
|
||||
{"jobs": [
|
||||
{"id": 1, "state": ["CREATED"]},
|
||||
{"id": 2, "state": ["ASSIGNED"]},
|
||||
{"id": 3, "state": ["SLEEPING"]},
|
||||
{"id": 3, "state": ["FINISHED"]}
|
||||
]}
|
||||
```
|
||||
|
||||
### POST /api/v0/job
|
||||
Perform a point-in-time query for jobs.
|
||||
|
||||
The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
|
||||
Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
|
||||
Valid exprs are any SQLite expression not containing sub-queries.
|
||||
|
||||
Here, we're looking for jobs tagged as in the `["CREATED"]` state.
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job --data '{"query": [["IS", "json_extract(state, '$[0]')", "CREATED"]]}' | jq .
|
||||
{"jobs": [
|
||||
{"id": 1, "state": ["CREATED"]},
|
||||
]}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/create
|
||||
Given a JSON document as the POST body, create a new job with a payload in the given state.
|
||||
If state is not provided, the state `null` is used.
|
||||
|
||||
```
|
||||
$ curl -X POST $JOBQ/api/v0/job/create --data '{"state": ["CREATED"], "payload": {"msg": "Hello, world!"}}' | jq .
|
||||
{
|
||||
"id": 1
|
||||
}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/poll
|
||||
Poll for at most one job matching the given query, atomically advancing it to the given state.
|
||||
|
||||
Uses the same query format as the /job endpoint.
|
||||
|
||||
Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
|
||||
Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
|
||||
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job/poll --data '{"query": [["IS", "j.state", null]], "state":["ASSIGNED", {"node": 1}]}' | jq .
|
||||
{
|
||||
"id": 3,
|
||||
"state": [
|
||||
"ASSIGNED",
|
||||
{
|
||||
"node": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### GET /api/v0/job/<job_id>
|
||||
Return all available data about a given job, including the payload, event log and current state.
|
||||
|
||||
```shell
|
||||
$ curl -X GET $JOBQ/api/v0/job/1 | jq .
|
||||
{
|
||||
"id": 1,
|
||||
"payload": {
|
||||
"msg": "Hello, world"
|
||||
},
|
||||
"state": [
|
||||
"CREATED"
|
||||
],
|
||||
"events": [
|
||||
[
|
||||
"job_created",
|
||||
{
|
||||
"timestamp": "1628909303"
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/<job_id>/state
|
||||
POST the 'current' state, and a proposed new state, attempting to update the state of the job using CAS.
|
||||
If the state of the job updates successfully, a new event will be appended to the job's log and the resulting job will be returned.
|
||||
Otherwise a conflict will be signaled.
|
||||
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job/1/state --data '{"new": ["ASSIGNED"], "old": ["CREATED"]}' | jq .
|
||||
{
|
||||
"id": 1,
|
||||
"payload": {
|
||||
"msg": "Hello, world"
|
||||
},
|
||||
"state": [
|
||||
"ASSIGNED"
|
||||
],
|
||||
"events": [
|
||||
[
|
||||
"job_created",
|
||||
{
|
||||
"timestamp": "1628911153"
|
||||
}
|
||||
],
|
||||
[
|
||||
"job_state_advanced",
|
||||
{
|
||||
"new": [
|
||||
"ASSIGNED"
|
||||
],
|
||||
"old": [
|
||||
"CREATED"
|
||||
],
|
||||
"timestamp": "1628911184"
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/<job_id>/event
|
||||
Append an arbitrary event to the log.
|
||||
User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.
|
||||
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job/1/event --data '["my cool event"]' | jq .events[-1]
|
||||
[
|
||||
"user_event",
|
||||
{
|
||||
"event": [
|
||||
"my cool event"
|
||||
],
|
||||
"timestamp": "1628911503"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
### DELETE /api/v0/job/<job_id>
|
||||
Expunge a given job from the system by ID.
|
||||
Abusing sqlite3 as a job queue.
|
||||
|
|
96
projects/jobq/benchmark.py
Normal file
96
projects/jobq/benchmark.py
Normal file
|
@ -0,0 +1,96 @@
|
|||
"""
|
||||
Benchmarking the jobq.
|
||||
"""
|
||||
|
||||
from contextlib import contextmanager
|
||||
from time import perf_counter_ns
|
||||
from abc import abstractclassmethod
|
||||
import os
|
||||
from random import randint, choice
|
||||
import string
|
||||
from statistics import mean, median, stdev
|
||||
import tempfile
|
||||
|
||||
from jobq import JobQueue
|
||||
|
||||
|
||||
def randstr(len):
|
||||
return ''.join(choice(string.ascii_uppercase + string.digits) for _ in range(len))
|
||||
|
||||
|
||||
class Timing(object):
|
||||
def __init__(self, start):
|
||||
self.start = start
|
||||
self.end = None
|
||||
|
||||
@property
|
||||
def duration(self):
|
||||
if self.end:
|
||||
return self.end - self.start
|
||||
|
||||
|
||||
@contextmanager
|
||||
def timing():
|
||||
"""A context manager that produces a semi-mutable timing record."""
|
||||
|
||||
obj = Timing(perf_counter_ns())
|
||||
yield obj
|
||||
obj.end = perf_counter_ns()
|
||||
|
||||
|
||||
def bench(callable, reps):
|
||||
timings = []
|
||||
with timing() as run_t:
|
||||
for _ in range(reps):
|
||||
with timing() as t:
|
||||
callable()
|
||||
timings.append(t.duration)
|
||||
print(f"""Ran {callable.__name__!r} {reps} times, total time {run_t.duration / 1e9} (s)
|
||||
mean: {mean(timings) / 1e9} (s)
|
||||
median: {median(timings) / 1e9} (s)
|
||||
stddev: {stdev(timings) / 1e9} (s)
|
||||
test overhead: {(run_t.duration - sum(timings)) / reps / 1e9} (s)
|
||||
""")
|
||||
|
||||
|
||||
def test_insert(q, reps):
|
||||
# Measuring insertion time
|
||||
jobs = [
|
||||
{"user_id": randint(0, 1<<32), "msg": randstr(256)}
|
||||
for _ in range(reps)
|
||||
]
|
||||
jobs_i = iter(jobs)
|
||||
|
||||
def insert():
|
||||
q.create(next(jobs_i), new_state=["CREATED"])
|
||||
|
||||
bench(insert, reps)
|
||||
|
||||
|
||||
def test_poll(q, reps):
|
||||
|
||||
def poll():
|
||||
q.poll([["=", "json_extract(j.state, '$[0]')", "'CREATED'"]], ["POLLED"])
|
||||
|
||||
bench(poll, reps)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test params
|
||||
reps = 10000
|
||||
path = "/tmp/jobq-bench.sqlite3"
|
||||
|
||||
# Ensuring a clean-ish run env.
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
|
||||
# And the tests
|
||||
print(f"Testing with {path}")
|
||||
q = JobQueue(path)
|
||||
test_insert(q, reps)
|
||||
test_poll(q, reps)
|
||||
|
||||
print(f"Testing with :memory:")
|
||||
q = JobQueue(":memory:")
|
||||
test_insert(q, reps)
|
||||
test_poll(q, reps)
|
261
projects/jobq/src/python/jobq/__init__.py
Normal file
261
projects/jobq/src/python/jobq/__init__.py
Normal file
|
@ -0,0 +1,261 @@
|
|||
"""
|
||||
A job queue library teetering atop sqlite3.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import sqlite3
|
||||
import json
|
||||
|
||||
import anosql
|
||||
from anosql_migrations import run_migrations, with_migrations
|
||||
|
||||
_GET_JOB_FIELDS = """\
|
||||
`id`
|
||||
, `payload`
|
||||
, `events`
|
||||
, `state`
|
||||
, `modified`
|
||||
"""
|
||||
|
||||
_SQL = f"""\
|
||||
-- name: migration-0000-create-jobq
|
||||
CREATE TABLE `job` (
|
||||
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
|
||||
, `payload` TEXT -- JSON payload
|
||||
, `events` TEXT DEFAULT '[]' -- append log of JSON events
|
||||
, `state` TEXT -- JSON state of the job
|
||||
, `modified` INTEGER DEFAULT CURRENT_TIMESTAMP -- last modified
|
||||
-- note the `rowid` field is defaulted
|
||||
);
|
||||
-- name: migration-0001-index-modified
|
||||
-- Enable efficient queries ordered by job modification
|
||||
CREATE INDEX IF NOT EXISTS `job_modified` ON `job` (
|
||||
`modified`
|
||||
);
|
||||
-- name: job-create<!
|
||||
INSERT INTO `job` (
|
||||
`payload`
|
||||
, `state`
|
||||
, `events`
|
||||
) VALUES (
|
||||
:payload
|
||||
, :state
|
||||
, json_array(json_array('job_created', json_object('timestamp', CURRENT_TIMESTAMP)))
|
||||
)
|
||||
RETURNING
|
||||
`id`
|
||||
, `state`
|
||||
;
|
||||
-- name: job-get?
|
||||
SELECT
|
||||
{_GET_JOB_FIELDS}
|
||||
FROM `job`
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-delete!
|
||||
DELETE FROM `job`
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-list
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
ORDER BY
|
||||
`id` ASC
|
||||
;
|
||||
-- name: job-filter-state
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
WHERE `state` = :state
|
||||
;
|
||||
-- name: job-append-event!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]', json_array('user_event', json_object('event', json(:event), 'timestamp', CURRENT_TIMESTAMP)))
|
||||
, `modified` = CURRENT_TIMESTAMP
|
||||
WHERE
|
||||
`id` = :id
|
||||
RETURNING
|
||||
{_GET_JOB_FIELDS}
|
||||
;
|
||||
-- name: job-cas-state<!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(:old_state), 'new', json(:new_state), 'timestamp', CURRENT_TIMESTAMP)))
|
||||
, `state` = json(:new_state)
|
||||
, `modified` = CURRENT_TIMESTAMP
|
||||
WHERE
|
||||
`id` = :id
|
||||
AND `state` = json(:old_state)
|
||||
RETURNING
|
||||
{_GET_JOB_FIELDS}
|
||||
;
|
||||
"""
|
||||
|
||||
# Anosql even as forked doesn't quite support inserting formatted sub-queries.
|
||||
# It's not generally safe, etc. So we have to do it ourselves :/
|
||||
# These two are broken out because they use computed `WHERE` clauses.
|
||||
|
||||
_QUERY_SQL = """\
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM
|
||||
`job` AS `j`
|
||||
WHERE
|
||||
({})
|
||||
;
|
||||
"""
|
||||
|
||||
_POLL_SQL = """\
|
||||
UPDATE `job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]', json_array('job_state_advanced', json_object('old', json(state), 'new', json(:state), 'timestamp', CURRENT_TIMESTAMP)))
|
||||
, `state` = json(:state)
|
||||
, `modified` = CURRENT_TIMESTAMP
|
||||
WHERE
|
||||
`id` IN (
|
||||
SELECT
|
||||
`id`
|
||||
FROM
|
||||
`job` AS `j`
|
||||
WHERE
|
||||
({{}})
|
||||
ORDER BY
|
||||
`modified` ASC
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
{_GET_JOB_FIELDS}
|
||||
;
|
||||
"""
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
|
||||
def compile_query(query):
|
||||
"""Compile a query to a SELECT over jobs.
|
||||
|
||||
The query is a sequence of ordered pairs [op, path, val].
|
||||
Supported ops are:
|
||||
- `<`, `>`, `<=`, `>=`, `=`
|
||||
- `LIKE`
|
||||
|
||||
Query ops join under `AND`
|
||||
"""
|
||||
|
||||
def compile_term(term):
|
||||
if term is None:
|
||||
return "NULL"
|
||||
else:
|
||||
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
|
||||
return term
|
||||
|
||||
def compile_op(op):
|
||||
op, qexpr, val = op
|
||||
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
|
||||
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
|
||||
|
||||
ops = [compile_op(op) for op in query]
|
||||
return " AND ".join(ops)
|
||||
|
||||
|
||||
class JobQueue(object):
|
||||
|
||||
def __init__(self, path):
|
||||
self._db = sqlite3.connect(path)
|
||||
self._queries = anosql.from_str(_SQL, "sqlite3")
|
||||
|
||||
with self._db as db:
|
||||
self._queries = with_migrations("sqlite3", self._queries, db)
|
||||
run_migrations(self._queries, db)
|
||||
|
||||
def query(self, query, limit=None):
|
||||
with self._db as db:
|
||||
query = compile_query(query)
|
||||
|
||||
def qf():
|
||||
cur = db.cursor()
|
||||
cur.execute(_QUERY_SQL.format(query))
|
||||
yield from cur.fetchall()
|
||||
cur.close()
|
||||
|
||||
jobs = qf()
|
||||
|
||||
if limit:
|
||||
limit = int(limit)
|
||||
def lf(iterable):
|
||||
iterable = iter(iterable)
|
||||
for i in range(limit):
|
||||
try:
|
||||
yield next(iterable)
|
||||
except StopIteration:
|
||||
break
|
||||
jobs = lf(jobs)
|
||||
|
||||
return list(jobs)
|
||||
|
||||
def create(self, job, new_state=None):
|
||||
"""Create a new job on the queue, optionally specifying its state."""
|
||||
|
||||
with self._db as db:
|
||||
(id, state), = self._queries.job_create(
|
||||
db,
|
||||
payload=json.dumps(job),
|
||||
state=json.dumps(new_state),
|
||||
)
|
||||
return id
|
||||
|
||||
def poll(self, query, new_state):
|
||||
"""Query for the longest-untouched job matching, advancing it to new_state."""
|
||||
|
||||
with self._db as db:
|
||||
cur = db.cursor()
|
||||
cur.execute(_POLL_SQL.format(compile_query(query)),
|
||||
{"state": json.dumps(new_state)})
|
||||
results = cur.fetchall()
|
||||
if results:
|
||||
return results
|
||||
|
||||
def get(self, job_id):
|
||||
"""Fetch all available data about a given job by ID."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_get(db, id=job_id)
|
||||
|
||||
def cas_state(self, job_id, old_state, new_state):
|
||||
"""CAS update a job's state, returning the updated job or indicating a conflict."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_cas_state(
|
||||
db,
|
||||
id=job_id,
|
||||
old_state=json.dumps(old_state),
|
||||
new_state=json.dumps(new_state),
|
||||
)
|
||||
|
||||
def append_event(self, job_id, event):
|
||||
"""Append a user-defined event to the job's log."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_append_event(
|
||||
db,
|
||||
id=job_id,
|
||||
event=json.dumps(event)
|
||||
)
|
||||
|
||||
def delete_job(self, job_id):
|
||||
"""Delete a job by ID, regardless of state."""
|
||||
|
||||
with self._db as db:
|
||||
return self._queries.job_delete(db, id=job_id)
|
|
@ -1,346 +0,0 @@
|
|||
"""
|
||||
A mock job queue.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from functools import wraps
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import sqlite3
|
||||
|
||||
import anosql
|
||||
from anosql_migrations import run_migrations, with_migrations
|
||||
from flask import abort, current_app, Flask, jsonify, request
|
||||
|
||||
|
||||
_SQL = """\
|
||||
-- name: migration-0000-create-jobq
|
||||
CREATE TABLE `job` (
|
||||
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
|
||||
, `payload` TEXT -- JSON payload
|
||||
, `events` TEXT DEFAULT '[]' -- append log of JSON events
|
||||
, `state` TEXT -- JSON state of the job
|
||||
);
|
||||
-- name: job-create<!
|
||||
INSERT INTO `job` (
|
||||
`payload`
|
||||
, `state`
|
||||
, `events`
|
||||
) VALUES (
|
||||
:payload
|
||||
, :state
|
||||
, json_array(json_array('job_created',
|
||||
json_object('timestamp', strftime('%s', 'now'))))
|
||||
)
|
||||
RETURNING
|
||||
`id`
|
||||
, `state`
|
||||
;
|
||||
-- name: job-get?
|
||||
SELECT
|
||||
`id`
|
||||
, `payload`
|
||||
, `events`
|
||||
, `state`
|
||||
FROM `job`
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-delete!
|
||||
DELETE FROM `job`
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-list
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
ORDER BY
|
||||
`id` ASC
|
||||
;
|
||||
-- name: job-filter-state
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
WHERE `state` = :state
|
||||
;
|
||||
-- name: job-append-event!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('user_event',
|
||||
json_object('event', json(:event),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-advance-state!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('job_state_advanced',
|
||||
json_object('old', json(state),
|
||||
'new', json(:state),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:state)
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-cas-state<!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('job_state_advanced',
|
||||
json_object('old', json(:old_state),
|
||||
'new', json(:new_state),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:new_state)
|
||||
WHERE
|
||||
`id` = :id
|
||||
AND `state` = json(:old_state)
|
||||
RETURNING
|
||||
`id`
|
||||
;
|
||||
"""
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--port", type=int, default=8080)
|
||||
parser.add_argument("--host", default="localhost")
|
||||
parser.add_argument("--db", default="~/jobq.sqlite3")
|
||||
|
||||
|
||||
@app.before_first_request
|
||||
def setup_queries():
|
||||
# Load the queries
|
||||
queries = anosql.from_str(_SQL, "sqlite3")
|
||||
|
||||
# Do migrations
|
||||
with sqlite3.connect(current_app.config["db"]) as db:
|
||||
queries = with_migrations("sqlite3", queries, db)
|
||||
run_migrations(queries, db)
|
||||
|
||||
current_app.queries = queries
|
||||
|
||||
|
||||
@app.before_request
|
||||
def before_request():
|
||||
request.db = sqlite3.connect(current_app.config["db"])
|
||||
request.db.set_trace_callback(logging.getLogger("sqlite3").debug)
|
||||
|
||||
|
||||
@app.teardown_request
|
||||
def teardown_request(exception):
|
||||
request.db.commit()
|
||||
request.db.close()
|
||||
|
||||
|
||||
def compile_query(query):
|
||||
"""Compile a query to a SELECT over jobs.
|
||||
|
||||
The query is a sequence of ordered pairs [op, path, val].
|
||||
Supported ops are:
|
||||
- `<`, `>`, `<=`, `>=`, `=`
|
||||
- `LIKE`
|
||||
|
||||
Query ops join under `AND`
|
||||
"""
|
||||
|
||||
def compile_term(term):
|
||||
if term is None:
|
||||
return "NULL"
|
||||
else:
|
||||
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
|
||||
return term
|
||||
|
||||
def compile_op(op):
|
||||
op, qexpr, val = op
|
||||
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
|
||||
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
|
||||
|
||||
ops = [compile_op(op) for op in query]
|
||||
return " AND ".join(ops)
|
||||
|
||||
|
||||
@app.route("/api/v0/job", methods=["GET", "POST"])
|
||||
def get_jobs():
|
||||
"""Return jobs in the system."""
|
||||
|
||||
if request.method == "POST":
|
||||
blob = request.get_json(force=True)
|
||||
else:
|
||||
blob = {}
|
||||
|
||||
if query := blob.get("query"):
|
||||
query = compile_query(query)
|
||||
print(query)
|
||||
def qf():
|
||||
cur = request.db.cursor()
|
||||
cur.execute(f"SELECT `id`, `state` FROM `job` AS `j` WHERE {query};")
|
||||
yield from cur.fetchall()
|
||||
cur.close()
|
||||
jobs = qf()
|
||||
|
||||
else:
|
||||
jobs = current_app.queries.job_list(request.db)
|
||||
|
||||
if limit := blob.get("limit", 100):
|
||||
limit = int(limit)
|
||||
def lf(iterable):
|
||||
iterable = iter(iterable)
|
||||
for i in range(limit):
|
||||
try:
|
||||
yield next(iterable)
|
||||
except StopIteration:
|
||||
break
|
||||
jobs = lf(jobs)
|
||||
|
||||
return jsonify({
|
||||
"jobs": [
|
||||
{
|
||||
"id": id,
|
||||
"state": json.loads(state) if state is not None else state
|
||||
}
|
||||
for id, state in jobs
|
||||
]
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/create", methods=["POST"])
|
||||
def create_job():
|
||||
"""Create a job."""
|
||||
|
||||
blob = request.get_json(force=True)
|
||||
payload = blob["payload"]
|
||||
state = blob.get("state", None)
|
||||
id, state = current_app.queries.job_create(
|
||||
request.db,
|
||||
payload=json.dumps(payload),
|
||||
state=json.dumps(state),
|
||||
)
|
||||
return jsonify({"id": id, "state": state}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/poll", methods=["POST"])
|
||||
def poll_job():
|
||||
"""Using a query, attempt to poll for the next job matching criteria."""
|
||||
|
||||
blob = request.get_json(force=True)
|
||||
query = compile_query(blob["query"])
|
||||
cur = request.db.cursor()
|
||||
cur.execute(f"""\
|
||||
UPDATE `job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('job_state_advanced',
|
||||
json_object('old', json(state),
|
||||
'new', json(:state),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:state)
|
||||
WHERE
|
||||
`id` IN (
|
||||
SELECT
|
||||
`id`
|
||||
FROM
|
||||
`job` AS `j`
|
||||
WHERE
|
||||
{query}
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
`id`
|
||||
, `state`
|
||||
;""", {"state": json.dumps(blob["state"])})
|
||||
results = cur.fetchall()
|
||||
cur.close()
|
||||
if results:
|
||||
(id, state), = results
|
||||
return jsonify({"id": id, "state": json.loads(state)}), 200
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>", methods=["GET"])
|
||||
def get_job(job_id):
|
||||
"""Return a job by ID."""
|
||||
|
||||
r = current_app.queries.job_get(request.db, id=job_id)
|
||||
if not r:
|
||||
abort(404)
|
||||
|
||||
# Unpack the response tuple
|
||||
id, payload, events, state = r
|
||||
return jsonify({
|
||||
"id": id,
|
||||
"payload": json.loads(payload),
|
||||
"events": json.loads(events),
|
||||
"state": json.loads(state) if state is not None else state,
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
|
||||
def update_state(job_id):
|
||||
"""CAS update a job's state, returning the updated job or indicating a conflict."""
|
||||
|
||||
document = request.get_json(force=True)
|
||||
old = document["old"]
|
||||
new = document["new"]
|
||||
if current_app.queries.job_cas_state(
|
||||
request.db,
|
||||
id=job_id,
|
||||
old_state=json.dumps(old),
|
||||
new_state=json.dumps(new),
|
||||
):
|
||||
return get_job(job_id)
|
||||
else:
|
||||
abort(409)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
|
||||
def append_event(job_id):
|
||||
"""Append a user-defined event to the job's log."""
|
||||
|
||||
current_app.queries.job_append_event(
|
||||
request.db,
|
||||
id=job_id,
|
||||
event=json.dumps(request.get_json(force=True))
|
||||
)
|
||||
|
||||
return get_job(job_id)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
|
||||
def delete_job(job_id):
|
||||
"""Delete a given job."""
|
||||
|
||||
current_app.queries.job_delete(request.db, id=job_id)
|
||||
|
||||
return jsonify({}), 200
|
||||
|
||||
def main():
|
||||
"""Run the mock server."""
|
||||
|
||||
opts, args = parser.parse_known_args()
|
||||
|
||||
app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
|
||||
app.config["host"] = opts.host
|
||||
app.config["port"] = opts.port
|
||||
|
||||
app.run(
|
||||
threaded=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
24
projects/jobqd/BUILD
Normal file
24
projects/jobqd/BUILD
Normal file
|
@ -0,0 +1,24 @@
|
|||
zapp_binary(
|
||||
name = "jobqd",
|
||||
main = "src/python/jobqd/__main__.py",
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
deps = [
|
||||
"//projects/jobq",
|
||||
py_requirement("flask"),
|
||||
]
|
||||
)
|
||||
|
||||
py_library(
|
||||
name = "client",
|
||||
srcs = [
|
||||
"src/python/jobq/rest/api.py",
|
||||
],
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
deps = [
|
||||
py_requirement("requests"),
|
||||
],
|
||||
)
|
58
projects/jobqd/README.md
Normal file
58
projects/jobqd/README.md
Normal file
|
@ -0,0 +1,58 @@
|
|||
# Jobq
|
||||
|
||||
Jobq is an event-oriented framework for recording _jobs_.
|
||||
Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
|
||||
Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
|
||||
Users may manually add _events_ to the log.
|
||||
|
||||
Note that, while we strongly suggest that _state_ should always be a tagged tuple, no constraints are placed on its contents.
|
||||
|
||||
## HTTP API
|
||||
|
||||
### GET /api/v0/queue
|
||||
Enumerate all the queues in the system.
|
||||
|
||||
### POST /api/v0/queue
|
||||
Create a new queue of jobs.
|
||||
|
||||
### DELETE /api/v0/queue/<q_id>
|
||||
Expunge a queue, deleting all jobs in it regardless of state.
|
||||
|
||||
### GET /api/v0/queue/<q_id>/job
|
||||
Return an enumeration of jobs active in the system.
|
||||
|
||||
### POST /api/v0/queue/<q_id>/query_jobs
|
||||
Perform a point-in-time query for jobs.
|
||||
|
||||
The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
|
||||
Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
|
||||
Valid exprs are any SQLite expression not containing sub-queries.
|
||||
|
||||
Here, we're looking for jobs tagged as in the `["CREATED"]` state.
|
||||
|
||||
### POST /api/v0/queue/<q_id>/poll_job
|
||||
Poll for at most one job matching the given query, atomically advancing it to the given state.
|
||||
|
||||
Uses the same query format as the /job endpoint.
|
||||
|
||||
Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
|
||||
Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
|
||||
|
||||
### POST /api/v0/queue/<q_id>/job
|
||||
Given a JSON document as the POST body, create a new job with a payload in the given state.
|
||||
If state is not provided, the state `null` is used.
|
||||
|
||||
### GET /api/v0/queue/<q_id>/job/<job_id>
|
||||
Return all available data about a given job, including the payload, event log and current state.
|
||||
|
||||
### DELETE /api/v0/queue/<q_id>/job/<job_id>
|
||||
Expunge a given job from the system by ID.
|
||||
|
||||
### POST /api/v0/queue/<q_id>/job/<job_id>/state
|
||||
POST the 'current' state, and a proposed new state, attempting to update the state of the job using CAS.
|
||||
If the state of the job updates successfully, a new event will be appended to the job's log and the resulting job will be returned.
|
||||
Otherwise a conflict will be signaled.
|
||||
|
||||
### POST /api/v0/queue/<q_id>/job/<job_id>/event
|
||||
Append an arbitrary event to the log.
|
||||
User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.
|
146
projects/jobqd/src/__main__.py
Normal file
146
projects/jobqd/src/__main__.py
Normal file
|
@ -0,0 +1,146 @@
|
|||
"""
|
||||
A mock job queue.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from functools import wraps
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import sqlite3
|
||||
|
||||
from jobq import JobQueue
|
||||
|
||||
from flask import abort, current_app, Flask, jsonify, request
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--port", type=int, default=8080)
|
||||
parser.add_argument("--host", default="localhost")
|
||||
parser.add_argument("--db", default="~/jobq.sqlite3")
|
||||
|
||||
|
||||
@app.before_first_request
|
||||
def setup_queries():
|
||||
current_app.q = JobQueue(current_app.config["db"])
|
||||
|
||||
|
||||
@app.route("/api/v0/job", methods=["GET", "POST"])
|
||||
def get_jobs():
|
||||
"""Return jobs in the system."""
|
||||
|
||||
if request.method == "POST":
|
||||
blob = request.get_json(force=True)
|
||||
else:
|
||||
blob = {}
|
||||
|
||||
query = blob.get("query", [["true"]])
|
||||
|
||||
return jsonify({
|
||||
"jobs": [
|
||||
{
|
||||
"id": id,
|
||||
"state": json.loads(state) if state is not None else state
|
||||
}
|
||||
for id, state in current_app.q.query(query)
|
||||
]
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/create", methods=["POST"])
|
||||
def create_job():
|
||||
"""Create a job."""
|
||||
|
||||
blob = request.get_json(force=True)
|
||||
payload = blob["payload"]
|
||||
state = blob.get("state", None)
|
||||
id, state = current_app.q.create(
|
||||
payload, state
|
||||
)
|
||||
return jsonify({"id": id, "state": state}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/poll", methods=["POST"])
|
||||
def poll_job():
|
||||
"""Using a query, attempt to poll for the next job matching criteria."""
|
||||
|
||||
blob = request.get_json(force=True)
|
||||
query = blob["query"]
|
||||
state = blob["state"]
|
||||
results = current_app.q.poll(query, state)
|
||||
if results:
|
||||
(id, state), = results
|
||||
return jsonify({"id": id, "state": json.loads(state)}), 200
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>", methods=["GET"])
|
||||
def get_job(job_id):
|
||||
"""Return a job by ID."""
|
||||
|
||||
r = current_app.q.get(id=job_id)
|
||||
if not r:
|
||||
abort(404)
|
||||
|
||||
# Unpack the response tuple
|
||||
id, payload, events, state, modified = r
|
||||
return jsonify({
|
||||
"id": id,
|
||||
"payload": json.loads(payload),
|
||||
"events": json.loads(events),
|
||||
"state": json.loads(state) if state is not None else state,
|
||||
"modified": modified,
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
|
||||
def update_state(job_id):
|
||||
"""CAS update a job's state, returning the updated job or indicating a conflict."""
|
||||
|
||||
document = request.get_json(force=True)
|
||||
old = document["old"]
|
||||
new = document["new"]
|
||||
if current_app.q.cas_state(job_id, old, new):
|
||||
return get_job(job_id)
|
||||
else:
|
||||
abort(409)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
|
||||
def append_event(job_id):
|
||||
"""Append a user-defined event to the job's log."""
|
||||
|
||||
return current_app.q.append_event(job_id, event=request.get_json(force=True))
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
|
||||
def delete_job(job_id):
|
||||
"""Delete a given job."""
|
||||
|
||||
current_app.queries.job_delete(request.db, id=job_id)
|
||||
|
||||
return jsonify({}), 200
|
||||
|
||||
def main():
|
||||
"""Run the mock server."""
|
||||
|
||||
opts, args = parser.parse_known_args()
|
||||
|
||||
app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
|
||||
app.config["host"] = opts.host
|
||||
app.config["port"] = opts.port
|
||||
|
||||
app.run(
|
||||
threaded=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
126
projects/jobqd/src/python/jobq/openapi.yaml
Normal file
126
projects/jobqd/src/python/jobq/openapi.yaml
Normal file
|
@ -0,0 +1,126 @@
|
|||
---
|
||||
openapi: 3.0.1
|
||||
info:
|
||||
title: Jobq
|
||||
description: A trivial job queue API
|
||||
|
||||
tags:
|
||||
- name: queue
|
||||
- name: job
|
||||
|
||||
definitions:
|
||||
parameters:
|
||||
q_id:
|
||||
in: path
|
||||
name: q_id
|
||||
required: true
|
||||
description: The ID of a given queue
|
||||
j_id:
|
||||
in: path
|
||||
name: j_id
|
||||
required: true
|
||||
description: The ID of a given job
|
||||
|
||||
responses:
|
||||
id: {}
|
||||
queue: {}
|
||||
queues: {}
|
||||
job: {}
|
||||
|
||||
jobs:
|
||||
"200":
|
||||
description: A list of jobs
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/definitions/types/jobs"
|
||||
|
||||
types:
|
||||
queue: {}
|
||||
|
||||
queues:
|
||||
type: object
|
||||
properties:
|
||||
jobs:
|
||||
type: list
|
||||
items:
|
||||
$ref: "#/definitions/types/queue"
|
||||
|
||||
job: {}
|
||||
|
||||
dehydrated_job: {}
|
||||
|
||||
jobs:
|
||||
type: object
|
||||
properties:
|
||||
jobs:
|
||||
type: list
|
||||
items:
|
||||
$ref: "#/definitions/types/dehydrated_job"
|
||||
|
||||
paths:
|
||||
"/api/v0/queue":
|
||||
get:
|
||||
description: Fetch a list of queues in the system
|
||||
responses:
|
||||
|
||||
post:
|
||||
description: Create a new job queue
|
||||
responses:
|
||||
"200":
|
||||
description: Created
|
||||
"409":
|
||||
description: Conflict
|
||||
|
||||
"/api/v0/queue/{q_id}":
|
||||
get:
|
||||
description: ""
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/q_id"
|
||||
|
||||
responses:
|
||||
$ref: "#/definitions/responses/jobs"
|
||||
|
||||
"/api/v0/queue/{q_id}/job":
|
||||
post:
|
||||
description: "Create a job within a given queue."
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/q_id"
|
||||
|
||||
"/api/v0/queue/{q_id}/query_jobs":
|
||||
post:
|
||||
description: "Query the jobs in a queue."
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/q_id"
|
||||
|
||||
responses:
|
||||
"200": {}
|
||||
|
||||
"/api/v0/queue/{q_id}/poll_job":
|
||||
post:
|
||||
description: "Poll zero or one jobs off the queue."
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/q_id"
|
||||
|
||||
"/api/v0/job/{j_id}":
|
||||
get:
|
||||
description: "Return all available data about the job"
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/j_id"
|
||||
|
||||
delete:
|
||||
description: "Expunge the job"
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/j_id"
|
||||
|
||||
"/api/v0/job/{j_id}/state":
|
||||
post:
|
||||
description: "Alter the job's state, appending an event"
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/j_id"
|
||||
|
||||
"/api/v0/job/{j_id}/event":
|
||||
post:
|
||||
description: "Append an event to a given job without modifying state"
|
||||
parameters:
|
||||
- $ref: "#/definitions/parameters/j_d"
|
Loading…
Reference in a new issue