An initial crack at a jobq
This commit is contained in:
parent
477d3f1055
commit
d242f73dc4
5 changed files with 628 additions and 6 deletions
|
@ -1,4 +1,12 @@
|
|||
"""
|
||||
A driver object implementing support for SQLite3
|
||||
"""
|
||||
|
||||
from contextlib import contextmanager
|
||||
import logging
|
||||
import sqlite3
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SQLite3DriverAdapter(object):
|
||||
|
@ -21,6 +29,7 @@ class SQLite3DriverAdapter(object):
|
|||
@staticmethod
|
||||
def select(conn, _query_name, sql, parameters):
|
||||
cur = conn.cursor()
|
||||
log.debug({'sql': sql, 'parameters': parameters})
|
||||
cur.execute(sql, parameters)
|
||||
results = cur.fetchall()
|
||||
cur.close()
|
||||
|
@ -28,8 +37,9 @@ class SQLite3DriverAdapter(object):
|
|||
|
||||
@staticmethod
|
||||
@contextmanager
|
||||
def select_cursor(conn, _query_name, sql, parameters):
|
||||
def select_cursor(conn: sqlite3.Connection, _query_name, sql, parameters):
|
||||
cur = conn.cursor()
|
||||
log.debug({'sql': sql, 'parameters': parameters})
|
||||
cur.execute(sql, parameters)
|
||||
try:
|
||||
yield cur
|
||||
|
@ -37,21 +47,26 @@ class SQLite3DriverAdapter(object):
|
|||
cur.close()
|
||||
|
||||
@staticmethod
|
||||
def insert_update_delete(conn, _query_name, sql, parameters):
|
||||
def insert_update_delete(conn: sqlite3.Connection, _query_name, sql, parameters):
|
||||
log.debug({'sql': sql, 'parameters': parameters})
|
||||
conn.execute(sql, parameters)
|
||||
|
||||
@staticmethod
|
||||
def insert_update_delete_many(conn, _query_name, sql, parameters):
|
||||
def insert_update_delete_many(conn: sqlite3.Connection, _query_name, sql, parameters):
|
||||
log.debug({'sql': sql, 'parameters': parameters})
|
||||
conn.executemany(sql, parameters)
|
||||
|
||||
@staticmethod
|
||||
def insert_returning(conn, _query_name, sql, parameters):
|
||||
def insert_returning(conn: sqlite3.Connection, _query_name, sql, parameters):
|
||||
cur = conn.cursor()
|
||||
log.debug({'sql': sql, 'parameters': parameters})
|
||||
cur.execute(sql, parameters)
|
||||
results = cur.lastrowid
|
||||
results = cur.fetchall()
|
||||
log.debug({"results": results})
|
||||
cur.close()
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def execute_script(conn, sql):
|
||||
def execute_script(conn: sqlite3.Connection, sql):
|
||||
log.debug({'sql': sql, 'parameters': None})
|
||||
conn.executescript(sql)
|
||||
|
|
26
projects/jobq/BUILD
Normal file
26
projects/jobq/BUILD
Normal file
|
@ -0,0 +1,26 @@
|
|||
zapp_binary(
|
||||
name = "jobq",
|
||||
main = "src/python/jobq/__main__.py",
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
deps = [
|
||||
"//projects/anosql",
|
||||
"//projects/anosql-migrations",
|
||||
py_requirement("flask"),
|
||||
py_requirement("pyyaml"),
|
||||
]
|
||||
)
|
||||
|
||||
py_library(
|
||||
name = "client",
|
||||
srcs = [
|
||||
"src/python/jobq/rest/api.py",
|
||||
],
|
||||
imports = [
|
||||
"src/python",
|
||||
],
|
||||
deps = [
|
||||
py_requirement("requests"),
|
||||
],
|
||||
)
|
150
projects/jobq/README.md
Normal file
150
projects/jobq/README.md
Normal file
|
@ -0,0 +1,150 @@
|
|||
# Jobq
|
||||
|
||||
Jobq is an event-oriented framework for recording _jobs_.
|
||||
Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
|
||||
Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
|
||||
Users may manually add _events_ to the log.
|
||||
|
||||
Note that, while we strongly suggest that _state_ should always be a tagged tuple and the default is `["CREATED"]`, no constraints are placed on its contents.
|
||||
|
||||
## HTTP API
|
||||
|
||||
### GET /api/v0/job
|
||||
Return an enumeration of jobs active in the system.
|
||||
|
||||
**Response** -
|
||||
```shell
|
||||
$ curl -X POST $JOBQ/api/v0/job | jq .
|
||||
{"jobs": [
|
||||
{"id": 1, "state": ["CREATED"]},
|
||||
{"id": 2, "state": ["ASSIGNED"]},
|
||||
{"id": 3, "state": ["SLEEPING"]},
|
||||
{"id": 3, "state": ["FINISHED"]}
|
||||
]}
|
||||
```
|
||||
|
||||
### POST /api/v0/job
|
||||
Perform a point-in-time query for jobs.
|
||||
|
||||
The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
|
||||
Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
|
||||
Valid ops any SQLite expression not containing sub-queries.
|
||||
|
||||
Here, we're looking for jobs tagged as in the `["CREATED"]` state.
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job --data '{"query": [["IS", "json_extract(state, '$[0]')", "CREATED"]]}' | jq .
|
||||
{"jobs": [
|
||||
{"id": 1, "state": ["CREATED"]},
|
||||
]}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/create
|
||||
Given a JSON document as the POST body, create a new job.
|
||||
|
||||
```
|
||||
$ curl -X POST $JOBQ/api/v0/job/create --data '{"msg": "Hello, world!"}' | jq .
|
||||
{
|
||||
"id": 1
|
||||
}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/poll
|
||||
Poll for at most one job matching the given query, atomically advancing it to the given state.
|
||||
|
||||
Uses the same query format as the /job endpoint.
|
||||
|
||||
Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
|
||||
Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
|
||||
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job/poll --data '{"query": [["IS", "j.state", null]], "state":["ASSIGNED", {"node": 1}]}' | jq .
|
||||
{
|
||||
"id": 3,
|
||||
"state": [
|
||||
"ASSIGNED",
|
||||
{
|
||||
"node": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### GET /api/v0/job/<job_id>
|
||||
Return all available data about a given job, including the payload, event log and current state.
|
||||
|
||||
```shell
|
||||
$ curl -X GET $JOBQ/api/v0/job/1 | jq .
|
||||
{
|
||||
"id": 1,
|
||||
"payload": {
|
||||
"msg": "Hello, world"
|
||||
},
|
||||
"state": [
|
||||
"CREATED"
|
||||
],
|
||||
"events": [
|
||||
[
|
||||
"job_created",
|
||||
{
|
||||
"timestamp": "1628909303"
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/<job_id>/state
|
||||
Alter the state of a given job, appending a state change event to the log and returning the entire updated job.
|
||||
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job/1/state --data '["ASSIGNED"]' | jq .
|
||||
{
|
||||
"id": 1,
|
||||
"payload": {
|
||||
"msg": "Hello, world"
|
||||
},
|
||||
"state": [
|
||||
"ASSIGNED"
|
||||
],
|
||||
"events": [
|
||||
[
|
||||
"job_created",
|
||||
{
|
||||
"timestamp": "1628911153"
|
||||
}
|
||||
],
|
||||
[
|
||||
"job_state_advanced",
|
||||
{
|
||||
"new": [
|
||||
"ASSIGNED"
|
||||
],
|
||||
"old": [
|
||||
"CREATED"
|
||||
],
|
||||
"timestamp": "1628911184"
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### POST /api/v0/job/<job_id>/event
|
||||
Append an arbitrary event to the log.
|
||||
User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.
|
||||
|
||||
``` shell
|
||||
$ curl -X POST $JOBQ/api/v0/job/1/event --data '["my cool event"]' | jq .events[-1]
|
||||
[
|
||||
"user_event",
|
||||
{
|
||||
"event": [
|
||||
"my cool event"
|
||||
],
|
||||
"timestamp": "1628911503"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
### DELETE /api/v0/job/<job_id>
|
||||
Expunge a given job from the system by ID.
|
343
projects/jobq/src/python/jobq/__main__.py
Normal file
343
projects/jobq/src/python/jobq/__main__.py
Normal file
|
@ -0,0 +1,343 @@
|
|||
"""
|
||||
A mock job queue.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from functools import wraps
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import sqlite3
|
||||
|
||||
import anosql
|
||||
from anosql_migrations import run_migrations, with_migrations
|
||||
from flask import abort, current_app, Flask, jsonify, request
|
||||
|
||||
|
||||
_SQL = """\
|
||||
-- name: migration-0000-create-jobq
|
||||
CREATE TABLE `job` (
|
||||
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
|
||||
, `payload` TEXT -- JSON payload
|
||||
, `events` TEXT DEFAULT '[]' -- append log of JSON events
|
||||
, `state` TEXT -- JSON state of the job
|
||||
);
|
||||
-- name: job-create<!
|
||||
INSERT INTO `job` (
|
||||
`payload`
|
||||
, `state`
|
||||
, `events`
|
||||
) VALUES (
|
||||
:payload
|
||||
, :state
|
||||
, json_array(json_array('job_created',
|
||||
json_object('timestamp', strftime('%s', 'now'))))
|
||||
)
|
||||
RETURNING
|
||||
`id`
|
||||
, `state`
|
||||
;
|
||||
-- name: job-get?
|
||||
SELECT
|
||||
`id`
|
||||
, `payload`
|
||||
, `events`
|
||||
, `state`
|
||||
FROM `job`
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-delete!
|
||||
DELETE FROM `job`
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-list
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
ORDER BY
|
||||
`id` ASC
|
||||
;
|
||||
-- name: job-filter-state
|
||||
SELECT
|
||||
`id`
|
||||
, `state`
|
||||
FROM `job`
|
||||
WHERE `state` = :state
|
||||
;
|
||||
-- name: job-append-event!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('user_event',
|
||||
json_object('event', json(:event),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-advance-state!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('job_state_advanced',
|
||||
json_object('old', json(state),
|
||||
'new', json(:state),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:state)
|
||||
WHERE
|
||||
`id` = :id
|
||||
;
|
||||
-- name: job-cas-state<!
|
||||
UPDATE
|
||||
`job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('job_state_advanced',
|
||||
json_object('old', json(:old_state),
|
||||
'new', json(:new_state),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:new_state)
|
||||
WHERE
|
||||
`id` = :id
|
||||
AND `state` = json(:old_state)
|
||||
RETURNING
|
||||
`id`
|
||||
;
|
||||
"""
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--port", type=int, default=8080)
|
||||
parser.add_argument("--host", default="localhost")
|
||||
parser.add_argument("--db", default="~/jobq.sqlite3")
|
||||
|
||||
|
||||
@app.before_first_request
|
||||
def setup_queries():
|
||||
# Load the queries
|
||||
queries = anosql.from_str(_SQL, "sqlite3")
|
||||
|
||||
# Do migrations
|
||||
with sqlite3.connect(current_app.config["db"]) as db:
|
||||
queries = with_migrations("sqlite3", queries, db)
|
||||
run_migrations(queries, db)
|
||||
|
||||
current_app.queries = queries
|
||||
|
||||
|
||||
@app.before_request
|
||||
def before_request():
|
||||
request.db = sqlite3.connect(current_app.config["db"])
|
||||
request.db.set_trace_callback(logging.getLogger("sqlite3").debug)
|
||||
|
||||
|
||||
@app.teardown_request
|
||||
def teardown_request(exception):
|
||||
request.db.commit()
|
||||
request.db.close()
|
||||
|
||||
|
||||
def compile_query(query):
|
||||
"""Compile a query to a SELECT over jobs.
|
||||
|
||||
The query is a sequence of ordered pairs [op, path, val].
|
||||
Supported ops are:
|
||||
- `<`, `>`, `<=`, `>=`, `=`
|
||||
- `LIKE`
|
||||
|
||||
Query ops join under `AND`
|
||||
"""
|
||||
|
||||
def compile_term(term):
|
||||
if term is None:
|
||||
return "NULL"
|
||||
else:
|
||||
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
|
||||
return term
|
||||
|
||||
def compile_op(op):
|
||||
op, qexpr, val = op
|
||||
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
|
||||
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
|
||||
|
||||
ops = [compile_op(op) for op in query]
|
||||
return " AND ".join(ops)
|
||||
|
||||
|
||||
@app.route("/api/v0/job", methods=["GET", "POST"])
|
||||
def get_jobs():
|
||||
"""Return jobs in the system."""
|
||||
|
||||
if request.method == "POST":
|
||||
blob = request.get_json(force=True)
|
||||
else:
|
||||
blob = {}
|
||||
|
||||
if query := blob.get("query"):
|
||||
query = compile_query(query)
|
||||
print(query)
|
||||
def qf():
|
||||
cur = request.db.cursor()
|
||||
cur.execute(f"SELECT `id`, `state` FROM `job` AS `j` WHERE {query};")
|
||||
yield from cur.fetchall()
|
||||
cur.close()
|
||||
jobs = qf()
|
||||
|
||||
else:
|
||||
jobs = current_app.queries.job_list(request.db)
|
||||
|
||||
if limit := blob.get("limit", 100):
|
||||
limit = int(limit)
|
||||
def lf(iterable):
|
||||
iterable = iter(iterable)
|
||||
for i in range(limit):
|
||||
try:
|
||||
yield next(iterable)
|
||||
except StopIteration:
|
||||
break
|
||||
jobs = lf(jobs)
|
||||
|
||||
return jsonify({
|
||||
"jobs": [
|
||||
{
|
||||
"id": id,
|
||||
"state": json.loads(state) if state is not None else state
|
||||
}
|
||||
for id, state in jobs
|
||||
]
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/create", methods=["POST"])
|
||||
def create_job():
|
||||
"""Create a job."""
|
||||
|
||||
id, state = current_app.queries.job_create(
|
||||
request.db,
|
||||
payload=json.dumps(request.get_json(force=True)),
|
||||
state=None,
|
||||
)
|
||||
return jsonify({"id": id, "state": state}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/poll", methods=["POST"])
|
||||
def poll_job():
|
||||
"""Using a query, attempt to poll for the next job matching criteria."""
|
||||
|
||||
blob = request.get_json(force=True)
|
||||
query = compile_query(blob["query"])
|
||||
cur = request.db.cursor()
|
||||
cur.execute(f"""\
|
||||
UPDATE `job`
|
||||
SET
|
||||
`events` = json_insert(events, '$[#]',
|
||||
json_array('job_state_advanced',
|
||||
json_object('old', json(state),
|
||||
'new', json(:state),
|
||||
'timestamp', strftime('%s', 'now'))))
|
||||
, `state` = json(:state)
|
||||
WHERE
|
||||
`id` IN (
|
||||
SELECT
|
||||
`id`
|
||||
FROM
|
||||
`job` AS `j`
|
||||
WHERE
|
||||
{query}
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING
|
||||
`id`
|
||||
, `state`
|
||||
;""", {"state": json.dumps(blob["state"])})
|
||||
results = cur.fetchall()
|
||||
cur.close()
|
||||
if results:
|
||||
(id, state), = results
|
||||
return jsonify({"id": id, "state": json.loads(state)}), 200
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>", methods=["GET"])
|
||||
def get_job(job_id):
|
||||
"""Return a job by ID."""
|
||||
|
||||
r = current_app.queries.job_get(request.db, id=job_id)
|
||||
if not r:
|
||||
abort(404)
|
||||
|
||||
# Unpack the response tuple
|
||||
id, payload, events, state = r
|
||||
return jsonify({
|
||||
"id": id,
|
||||
"payload": json.loads(payload),
|
||||
"events": json.loads(events),
|
||||
"state": json.loads(state) if state is not None else state,
|
||||
}), 200
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
|
||||
def update_state(job_id):
|
||||
"""CAS update a job's state, returning the updated job or indicating a conflict."""
|
||||
|
||||
document = request.get_json(force=True)
|
||||
old = document["old"]
|
||||
new = document["new"]
|
||||
if current_app.queries.job_cas_state(
|
||||
request.db,
|
||||
id=job_id,
|
||||
old_state=json.dumps(old),
|
||||
new_state=json.dumps(new),
|
||||
):
|
||||
return get_job(job_id)
|
||||
else:
|
||||
abort(409)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
|
||||
def append_event(job_id):
|
||||
"""Append a user-defined event to the job's log."""
|
||||
|
||||
current_app.queries.job_append_event(
|
||||
request.db,
|
||||
id=job_id,
|
||||
event=json.dumps(request.get_json(force=True))
|
||||
)
|
||||
|
||||
return get_job(job_id)
|
||||
|
||||
|
||||
@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
|
||||
def delete_job(job_id):
|
||||
"""Delete a given job."""
|
||||
|
||||
current_app.queries.job_delete(request.db, id=job_id)
|
||||
|
||||
return jsonify({}), 200
|
||||
|
||||
def main():
|
||||
"""Run the mock server."""
|
||||
|
||||
opts, args = parser.parse_known_args()
|
||||
|
||||
app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
|
||||
app.config["host"] = opts.host
|
||||
app.config["port"] = opts.port
|
||||
|
||||
app.run(
|
||||
threaded=True,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
88
projects/jobq/src/python/jobq/rest/api.py
Normal file
88
projects/jobq/src/python/jobq/rest/api.py
Normal file
|
@ -0,0 +1,88 @@
|
|||
"""A quick and dirty Python driver for the jobq API."""
|
||||
|
||||
import typing as t
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class DehydratedJob(t.NamedTuple):
|
||||
"""The 'important' bits of a given job."""
|
||||
|
||||
id: int
|
||||
url: str
|
||||
state: object
|
||||
|
||||
|
||||
class HydratedJob(t.NamedTuple):
|
||||
"""The full state of a given job."""
|
||||
|
||||
id: int
|
||||
url: str
|
||||
state: object
|
||||
payload: object
|
||||
events: object
|
||||
|
||||
|
||||
Job = t.Union[DehydratedJob, HydratedJob]
|
||||
|
||||
|
||||
class JobqClient(object):
|
||||
def __init__(self, url, session=None):
|
||||
self._url = url
|
||||
self._session = session or requests.Session()
|
||||
|
||||
def _job(self, json):
|
||||
return DehydratedJob(id=json["id"],
|
||||
url=f"{self._url}/api/v0/job/{json.get('id')}",
|
||||
state=json["state"])
|
||||
|
||||
def jobs(self, query=None, limit=10) -> t.Iterable[DehydratedJob]:
|
||||
"""Enumerate jobs on the queue."""
|
||||
|
||||
for job_frag in self._session.post(self._url + "/api/v0/job",
|
||||
json={"query": query or [],
|
||||
"limit": limit})\
|
||||
.json()\
|
||||
.get("jobs"):
|
||||
yield self._job(job_frag)
|
||||
|
||||
def poll(self, query, state) -> DehydratedJob:
|
||||
"""Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job."""
|
||||
|
||||
return self._job(self._session.post(self._url + "/api/v0/job/poll",
|
||||
json={"query": query,
|
||||
"state": state}).json())
|
||||
|
||||
def create(self, payload: object) -> DehydratedJob:
|
||||
"""Create a new job in the system."""
|
||||
|
||||
job_frag = self._session.post(self._url + "/api/v0/job/create",
|
||||
json=payload)\
|
||||
.json()
|
||||
return self._job(job_frag)
|
||||
|
||||
def fetch(self, job: Job) -> HydratedJob:
|
||||
"""Fetch the current state of a job."""
|
||||
|
||||
return HydratedJob(url=job.url, **self._session.get(job.url).json())
|
||||
|
||||
def advance(self, job: Job, state: object) -> Job:
|
||||
"""Attempt to advance a job to a subsequent state."""
|
||||
|
||||
return HydratedJob(url=job.url,
|
||||
**self._session.post(job.url + "/state",
|
||||
json={"old": job.state,
|
||||
"new": state}).json())
|
||||
|
||||
def event(self, job: Job, event: object) -> HydratedJob:
|
||||
"""Attempt to record an event against a job."""
|
||||
|
||||
return HydratedJob(url=job.url,
|
||||
**self._session.post(job.url + "/event",
|
||||
json=event).json())
|
||||
|
||||
def delete(self, job: Job) -> None:
|
||||
"""Delete a remote job."""
|
||||
|
||||
return self._session.delete(job.url)\
|
||||
.raise_for_status()
|
Loading…
Reference in a new issue