An initial crack at a jobq

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-14 00:31:07 -06:00
parent 2c2d169ab7
commit d27e7e6df6
5 changed files with 628 additions and 6 deletions

View file

@ -1,4 +1,12 @@
"""
A driver object implementing support for SQLite3
"""
from contextlib import contextmanager
import logging
import sqlite3
log = logging.getLogger(__name__)
class SQLite3DriverAdapter(object):
@ -21,6 +29,7 @@ class SQLite3DriverAdapter(object):
@staticmethod
def select(conn, _query_name, sql, parameters):
cur = conn.cursor()
log.debug({'sql': sql, 'parameters': parameters})
cur.execute(sql, parameters)
results = cur.fetchall()
cur.close()
@ -28,8 +37,9 @@ class SQLite3DriverAdapter(object):
@staticmethod
@contextmanager
def select_cursor(conn, _query_name, sql, parameters):
def select_cursor(conn: sqlite3.Connection, _query_name, sql, parameters):
cur = conn.cursor()
log.debug({'sql': sql, 'parameters': parameters})
cur.execute(sql, parameters)
try:
yield cur
@ -37,21 +47,26 @@ class SQLite3DriverAdapter(object):
cur.close()
@staticmethod
def insert_update_delete(conn, _query_name, sql, parameters):
def insert_update_delete(conn: sqlite3.Connection, _query_name, sql, parameters):
log.debug({'sql': sql, 'parameters': parameters})
conn.execute(sql, parameters)
@staticmethod
def insert_update_delete_many(conn, _query_name, sql, parameters):
def insert_update_delete_many(conn: sqlite3.Connection, _query_name, sql, parameters):
log.debug({'sql': sql, 'parameters': parameters})
conn.executemany(sql, parameters)
@staticmethod
def insert_returning(conn, _query_name, sql, parameters):
def insert_returning(conn: sqlite3.Connection, _query_name, sql, parameters):
cur = conn.cursor()
log.debug({'sql': sql, 'parameters': parameters})
cur.execute(sql, parameters)
results = cur.lastrowid
results = cur.fetchall()
log.debug({"results": results})
cur.close()
return results
@staticmethod
def execute_script(conn, sql):
def execute_script(conn: sqlite3.Connection, sql):
log.debug({'sql': sql, 'parameters': None})
conn.executescript(sql)

26
projects/jobq/BUILD Normal file
View file

@ -0,0 +1,26 @@
zapp_binary(
name = "jobq",
main = "src/python/jobq/__main__.py",
imports = [
"src/python",
],
deps = [
"//projects/anosql",
"//projects/anosql-migrations",
py_requirement("flask"),
py_requirement("pyyaml"),
]
)
py_library(
name = "client",
srcs = [
"src/python/jobq/rest/api.py",
],
imports = [
"src/python",
],
deps = [
py_requirement("requests"),
],
)

150
projects/jobq/README.md Normal file
View file

@ -0,0 +1,150 @@
# Jobq
Jobq is an event-oriented framework for recording _jobs_.
Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
Users may manually add _events_ to the log.
Note that, while we strongly suggest that _state_ should always be a tagged tuple and the default is `["CREATED"]`, no constraints are placed on its contents.
## HTTP API
### GET /api/v0/job
Return an enumeration of jobs active in the system.
**Response** -
```shell
$ curl -X POST $JOBQ/api/v0/job | jq .
{"jobs": [
{"id": 1, "state": ["CREATED"]},
{"id": 2, "state": ["ASSIGNED"]},
{"id": 3, "state": ["SLEEPING"]},
{"id": 3, "state": ["FINISHED"]}
]}
```
### POST /api/v0/job
Perform a point-in-time query for jobs.
The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
Valid ops any SQLite expression not containing sub-queries.
Here, we're looking for jobs tagged as in the `["CREATED"]` state.
``` shell
$ curl -X POST $JOBQ/api/v0/job --data '{"query": [["IS", "json_extract(state, '$[0]')", "CREATED"]]}' | jq .
{"jobs": [
{"id": 1, "state": ["CREATED"]},
]}
```
### POST /api/v0/job/create
Given a JSON document as the POST body, create a new job.
```
$ curl -X POST $JOBQ/api/v0/job/create --data '{"msg": "Hello, world!"}' | jq .
{
"id": 1
}
```
### POST /api/v0/job/poll
Poll for at most one job matching the given query, atomically advancing it to the given state.
Uses the same query format as the /job endpoint.
Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
``` shell
$ curl -X POST $JOBQ/api/v0/job/poll --data '{"query": [["IS", "j.state", null]], "state":["ASSIGNED", {"node": 1}]}' | jq .
{
"id": 3,
"state": [
"ASSIGNED",
{
"node": 1
}
]
}
```
### GET /api/v0/job/<job_id>
Return all available data about a given job, including the payload, event log and current state.
```shell
$ curl -X GET $JOBQ/api/v0/job/1 | jq .
{
"id": 1,
"payload": {
"msg": "Hello, world"
},
"state": [
"CREATED"
],
"events": [
[
"job_created",
{
"timestamp": "1628909303"
}
]
]
}
```
### POST /api/v0/job/<job_id>/state
Alter the state of a given job, appending a state change event to the log and returning the entire updated job.
``` shell
$ curl -X POST $JOBQ/api/v0/job/1/state --data '["ASSIGNED"]' | jq .
{
"id": 1,
"payload": {
"msg": "Hello, world"
},
"state": [
"ASSIGNED"
],
"events": [
[
"job_created",
{
"timestamp": "1628911153"
}
],
[
"job_state_advanced",
{
"new": [
"ASSIGNED"
],
"old": [
"CREATED"
],
"timestamp": "1628911184"
}
]
]
}
```
### POST /api/v0/job/<job_id>/event
Append an arbitrary event to the log.
User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.
``` shell
$ curl -X POST $JOBQ/api/v0/job/1/event --data '["my cool event"]' | jq .events[-1]
[
"user_event",
{
"event": [
"my cool event"
],
"timestamp": "1628911503"
}
]
```
### DELETE /api/v0/job/<job_id>
Expunge a given job from the system by ID.

View file

@ -0,0 +1,343 @@
"""
A mock job queue.
"""
import argparse
from functools import wraps
import json
import logging
import os
import sys
import sqlite3
import anosql
from anosql_migrations import run_migrations, with_migrations
from flask import abort, current_app, Flask, jsonify, request
_SQL = """\
-- name: migration-0000-create-jobq
CREATE TABLE `job` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key
, `payload` TEXT -- JSON payload
, `events` TEXT DEFAULT '[]' -- append log of JSON events
, `state` TEXT -- JSON state of the job
);
-- name: job-create<!
INSERT INTO `job` (
`payload`
, `state`
, `events`
) VALUES (
:payload
, :state
, json_array(json_array('job_created',
json_object('timestamp', strftime('%s', 'now'))))
)
RETURNING
`id`
, `state`
;
-- name: job-get?
SELECT
`id`
, `payload`
, `events`
, `state`
FROM `job`
WHERE
`id` = :id
;
-- name: job-delete!
DELETE FROM `job`
WHERE
`id` = :id
;
-- name: job-list
SELECT
`id`
, `state`
FROM `job`
ORDER BY
`id` ASC
;
-- name: job-filter-state
SELECT
`id`
, `state`
FROM `job`
WHERE `state` = :state
;
-- name: job-append-event!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]',
json_array('user_event',
json_object('event', json(:event),
'timestamp', strftime('%s', 'now'))))
WHERE
`id` = :id
;
-- name: job-advance-state!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]',
json_array('job_state_advanced',
json_object('old', json(state),
'new', json(:state),
'timestamp', strftime('%s', 'now'))))
, `state` = json(:state)
WHERE
`id` = :id
;
-- name: job-cas-state<!
UPDATE
`job`
SET
`events` = json_insert(events, '$[#]',
json_array('job_state_advanced',
json_object('old', json(:old_state),
'new', json(:new_state),
'timestamp', strftime('%s', 'now'))))
, `state` = json(:new_state)
WHERE
`id` = :id
AND `state` = json(:old_state)
RETURNING
`id`
;
"""
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
app = Flask(__name__)
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=8080)
parser.add_argument("--host", default="localhost")
parser.add_argument("--db", default="~/jobq.sqlite3")
@app.before_first_request
def setup_queries():
# Load the queries
queries = anosql.from_str(_SQL, "sqlite3")
# Do migrations
with sqlite3.connect(current_app.config["db"]) as db:
queries = with_migrations("sqlite3", queries, db)
run_migrations(queries, db)
current_app.queries = queries
@app.before_request
def before_request():
request.db = sqlite3.connect(current_app.config["db"])
request.db.set_trace_callback(logging.getLogger("sqlite3").debug)
@app.teardown_request
def teardown_request(exception):
request.db.commit()
request.db.close()
def compile_query(query):
"""Compile a query to a SELECT over jobs.
The query is a sequence of ordered pairs [op, path, val].
Supported ops are:
- `<`, `>`, `<=`, `>=`, `=`
- `LIKE`
Query ops join under `AND`
"""
def compile_term(term):
if term is None:
return "NULL"
else:
assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
return term
def compile_op(op):
op, qexpr, val = op
assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
return f"{compile_term(qexpr)} {op} {compile_term(val)}"
ops = [compile_op(op) for op in query]
return " AND ".join(ops)
@app.route("/api/v0/job", methods=["GET", "POST"])
def get_jobs():
"""Return jobs in the system."""
if request.method == "POST":
blob = request.get_json(force=True)
else:
blob = {}
if query := blob.get("query"):
query = compile_query(query)
print(query)
def qf():
cur = request.db.cursor()
cur.execute(f"SELECT `id`, `state` FROM `job` AS `j` WHERE {query};")
yield from cur.fetchall()
cur.close()
jobs = qf()
else:
jobs = current_app.queries.job_list(request.db)
if limit := blob.get("limit", 100):
limit = int(limit)
def lf(iterable):
iterable = iter(iterable)
for i in range(limit):
try:
yield next(iterable)
except StopIteration:
break
jobs = lf(jobs)
return jsonify({
"jobs": [
{
"id": id,
"state": json.loads(state) if state is not None else state
}
for id, state in jobs
]
}), 200
@app.route("/api/v0/job/create", methods=["POST"])
def create_job():
"""Create a job."""
id, state = current_app.queries.job_create(
request.db,
payload=json.dumps(request.get_json(force=True)),
state=None,
)
return jsonify({"id": id, "state": state}), 200
@app.route("/api/v0/job/poll", methods=["POST"])
def poll_job():
"""Using a query, attempt to poll for the next job matching criteria."""
blob = request.get_json(force=True)
query = compile_query(blob["query"])
cur = request.db.cursor()
cur.execute(f"""\
UPDATE `job`
SET
`events` = json_insert(events, '$[#]',
json_array('job_state_advanced',
json_object('old', json(state),
'new', json(:state),
'timestamp', strftime('%s', 'now'))))
, `state` = json(:state)
WHERE
`id` IN (
SELECT
`id`
FROM
`job` AS `j`
WHERE
{query}
LIMIT 1
)
RETURNING
`id`
, `state`
;""", {"state": json.dumps(blob["state"])})
results = cur.fetchall()
cur.close()
if results:
(id, state), = results
return jsonify({"id": id, "state": json.loads(state)}), 200
else:
abort(404)
@app.route("/api/v0/job/<job_id>", methods=["GET"])
def get_job(job_id):
"""Return a job by ID."""
r = current_app.queries.job_get(request.db, id=job_id)
if not r:
abort(404)
# Unpack the response tuple
id, payload, events, state = r
return jsonify({
"id": id,
"payload": json.loads(payload),
"events": json.loads(events),
"state": json.loads(state) if state is not None else state,
}), 200
@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
def update_state(job_id):
"""CAS update a job's state, returning the updated job or indicating a conflict."""
document = request.get_json(force=True)
old = document["old"]
new = document["new"]
if current_app.queries.job_cas_state(
request.db,
id=job_id,
old_state=json.dumps(old),
new_state=json.dumps(new),
):
return get_job(job_id)
else:
abort(409)
@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
def append_event(job_id):
"""Append a user-defined event to the job's log."""
current_app.queries.job_append_event(
request.db,
id=job_id,
event=json.dumps(request.get_json(force=True))
)
return get_job(job_id)
@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
def delete_job(job_id):
"""Delete a given job."""
current_app.queries.job_delete(request.db, id=job_id)
return jsonify({}), 200
def main():
"""Run the mock server."""
opts, args = parser.parse_known_args()
app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
app.config["host"] = opts.host
app.config["port"] = opts.port
app.run(
threaded=True,
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,88 @@
"""A quick and dirty Python driver for the jobq API."""
import typing as t
import requests
class DehydratedJob(t.NamedTuple):
"""The 'important' bits of a given job."""
id: int
url: str
state: object
class HydratedJob(t.NamedTuple):
"""The full state of a given job."""
id: int
url: str
state: object
payload: object
events: object
Job = t.Union[DehydratedJob, HydratedJob]
class JobqClient(object):
def __init__(self, url, session=None):
self._url = url
self._session = session or requests.Session()
def _job(self, json):
return DehydratedJob(id=json["id"],
url=f"{self._url}/api/v0/job/{json.get('id')}",
state=json["state"])
def jobs(self, query=None, limit=10) -> t.Iterable[DehydratedJob]:
"""Enumerate jobs on the queue."""
for job_frag in self._session.post(self._url + "/api/v0/job",
json={"query": query or [],
"limit": limit})\
.json()\
.get("jobs"):
yield self._job(job_frag)
def poll(self, query, state) -> DehydratedJob:
"""Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job."""
return self._job(self._session.post(self._url + "/api/v0/job/poll",
json={"query": query,
"state": state}).json())
def create(self, payload: object) -> DehydratedJob:
"""Create a new job in the system."""
job_frag = self._session.post(self._url + "/api/v0/job/create",
json=payload)\
.json()
return self._job(job_frag)
def fetch(self, job: Job) -> HydratedJob:
"""Fetch the current state of a job."""
return HydratedJob(url=job.url, **self._session.get(job.url).json())
def advance(self, job: Job, state: object) -> Job:
"""Attempt to advance a job to a subsequent state."""
return HydratedJob(url=job.url,
**self._session.post(job.url + "/state",
json={"old": job.state,
"new": state}).json())
def event(self, job: Job, event: object) -> HydratedJob:
"""Attempt to record an event against a job."""
return HydratedJob(url=job.url,
**self._session.post(job.url + "/event",
json=event).json())
def delete(self, job: Job) -> None:
"""Delete a remote job."""
return self._session.delete(job.url)\
.raise_for_status()