From 49157000b7e71c5bb696b67d851292270f14db8b Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Thu, 19 Aug 2021 23:28:33 -0600 Subject: [PATCH] Split the jobq into a library and a daemon --- projects/jobq/BUILD | 25 +- projects/jobq/README.md | 152 +------- projects/jobq/benchmark.py | 96 +++++ projects/jobq/src/python/jobq/__init__.py | 261 +++++++++++++ projects/jobq/src/python/jobq/__main__.py | 346 ------------------ projects/jobqd/BUILD | 24 ++ projects/jobqd/README.md | 58 +++ projects/jobqd/src/__main__.py | 146 ++++++++ projects/jobqd/src/python/jobq/openapi.yaml | 126 +++++++ .../src/python/jobq/rest/api.py | 0 10 files changed, 718 insertions(+), 516 deletions(-) create mode 100644 projects/jobq/benchmark.py create mode 100644 projects/jobq/src/python/jobq/__init__.py delete mode 100644 projects/jobq/src/python/jobq/__main__.py create mode 100644 projects/jobqd/BUILD create mode 100644 projects/jobqd/README.md create mode 100644 projects/jobqd/src/__main__.py create mode 100644 projects/jobqd/src/python/jobq/openapi.yaml rename projects/{jobq => jobqd}/src/python/jobq/rest/api.py (100%) diff --git a/projects/jobq/BUILD b/projects/jobq/BUILD index 70cc7dc..f3226f5 100644 --- a/projects/jobq/BUILD +++ b/projects/jobq/BUILD @@ -1,26 +1,13 @@ -zapp_binary( +py_project( name = "jobq", - main = "src/python/jobq/__main__.py", - imports = [ - "src/python", - ], - deps = [ + lib_deps = [ "//projects/anosql", "//projects/anosql-migrations", - py_requirement("flask"), - py_requirement("pyyaml"), ] ) -py_library( - name = "client", - srcs = [ - "src/python/jobq/rest/api.py", - ], - imports = [ - "src/python", - ], - deps = [ - py_requirement("requests"), - ], +zapp_binary( + name = "benchmark", + main = "benchmark.py", + deps = [":jobq"], ) diff --git a/projects/jobq/README.md b/projects/jobq/README.md index fa0891d..d9b5a96 100644 --- a/projects/jobq/README.md +++ b/projects/jobq/README.md @@ -1,153 +1,3 @@ # Jobq -Jobq is an event-oriented framework for recording _jobs_. -Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_. -Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change. -Users may manually add _events_ to the log. - -Note that, while we strongly suggest that _state_ should always be a tagged tuple, no constraints are placed on its contents. - -## HTTP API - -### GET /api/v0/job -Return an enumeration of jobs active in the system. - -**Response** - -```shell -$ curl -X POST $JOBQ/api/v0/job | jq . -{"jobs": [ - {"id": 1, "state": ["CREATED"]}, - {"id": 2, "state": ["ASSIGNED"]}, - {"id": 3, "state": ["SLEEPING"]}, - {"id": 3, "state": ["FINISHED"]} -]} -``` - -### POST /api/v0/job -Perform a point-in-time query for jobs. - -The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query. -Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends). -Valid exprs are any SQLite expression not containing sub-queries. - -Here, we're looking for jobs tagged as in the `["CREATED"]` state. -``` shell -$ curl -X POST $JOBQ/api/v0/job --data '{"query": [["IS", "json_extract(state, '$[0]')", "CREATED"]]}' | jq . -{"jobs": [ - {"id": 1, "state": ["CREATED"]}, -]} -``` - -### POST /api/v0/job/create -Given a JSON document as the POST body, create a new job with a payload in the given state. -If state is not provided, the state `null` is used. - -``` -$ curl -X POST $JOBQ/api/v0/job/create --data '{"state": ["CREATED"], "payload": {"msg": "Hello, world!"}}' | jq . -{ - "id": 1 -} -``` - -### POST /api/v0/job/poll -Poll for at most one job matching the given query, atomically advancing it to the given state. - -Uses the same query format as the /job endpoint. - -Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host. -Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria. - -``` shell -$ curl -X POST $JOBQ/api/v0/job/poll --data '{"query": [["IS", "j.state", null]], "state":["ASSIGNED", {"node": 1}]}' | jq . -{ - "id": 3, - "state": [ - "ASSIGNED", - { - "node": 1 - } - ] -} -``` - -### GET /api/v0/job/ -Return all available data about a given job, including the payload, event log and current state. - -```shell -$ curl -X GET $JOBQ/api/v0/job/1 | jq . -{ - "id": 1, - "payload": { - "msg": "Hello, world" - }, - "state": [ - "CREATED" - ], - "events": [ - [ - "job_created", - { - "timestamp": "1628909303" - } - ] - ] -} -``` - -### POST /api/v0/job//state -POST the 'current' state, and a proposed new state, attempting to update the state of the job using CAS. -If the state of the job updates successfully, a new event will be appended to the job's log and the resulting job will be returned. -Otherwise a conflict will be signaled. - -``` shell -$ curl -X POST $JOBQ/api/v0/job/1/state --data '{"new": ["ASSIGNED"], "old": ["CREATED"]}' | jq . -{ - "id": 1, - "payload": { - "msg": "Hello, world" - }, - "state": [ - "ASSIGNED" - ], - "events": [ - [ - "job_created", - { - "timestamp": "1628911153" - } - ], - [ - "job_state_advanced", - { - "new": [ - "ASSIGNED" - ], - "old": [ - "CREATED" - ], - "timestamp": "1628911184" - } - ] - ] -} -``` - -### POST /api/v0/job//event -Append an arbitrary event to the log. -User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted. - -``` shell -$ curl -X POST $JOBQ/api/v0/job/1/event --data '["my cool event"]' | jq .events[-1] -[ - "user_event", - { - "event": [ - "my cool event" - ], - "timestamp": "1628911503" - } -] -``` - -### DELETE /api/v0/job/ -Expunge a given job from the system by ID. +Abusing sqlite3 as a job queue. diff --git a/projects/jobq/benchmark.py b/projects/jobq/benchmark.py new file mode 100644 index 0000000..582ae18 --- /dev/null +++ b/projects/jobq/benchmark.py @@ -0,0 +1,96 @@ +""" +Benchmarking the jobq. +""" + +from contextlib import contextmanager +from time import perf_counter_ns +from abc import abstractclassmethod +import os +from random import randint, choice +import string +from statistics import mean, median, stdev +import tempfile + +from jobq import JobQueue + + +def randstr(len): + return ''.join(choice(string.ascii_uppercase + string.digits) for _ in range(len)) + + +class Timing(object): + def __init__(self, start): + self.start = start + self.end = None + + @property + def duration(self): + if self.end: + return self.end - self.start + + +@contextmanager +def timing(): + """A context manager that produces a semi-mutable timing record.""" + + obj = Timing(perf_counter_ns()) + yield obj + obj.end = perf_counter_ns() + + +def bench(callable, reps): + timings = [] + with timing() as run_t: + for _ in range(reps): + with timing() as t: + callable() + timings.append(t.duration) + print(f"""Ran {callable.__name__!r} {reps} times, total time {run_t.duration / 1e9} (s) + mean: {mean(timings) / 1e9} (s) + median: {median(timings) / 1e9} (s) + stddev: {stdev(timings) / 1e9} (s) + test overhead: {(run_t.duration - sum(timings)) / reps / 1e9} (s) +""") + + +def test_insert(q, reps): + # Measuring insertion time + jobs = [ + {"user_id": randint(0, 1<<32), "msg": randstr(256)} + for _ in range(reps) + ] + jobs_i = iter(jobs) + + def insert(): + q.create(next(jobs_i), new_state=["CREATED"]) + + bench(insert, reps) + + +def test_poll(q, reps): + + def poll(): + q.poll([["=", "json_extract(j.state, '$[0]')", "'CREATED'"]], ["POLLED"]) + + bench(poll, reps) + + +if __name__ == "__main__": + # Test params + reps = 10000 + path = "/tmp/jobq-bench.sqlite3" + + # Ensuring a clean-ish run env. + if os.path.exists(path): + os.remove(path) + + # And the tests + print(f"Testing with {path}") + q = JobQueue(path) + test_insert(q, reps) + test_poll(q, reps) + + print(f"Testing with :memory:") + q = JobQueue(":memory:") + test_insert(q, reps) + test_poll(q, reps) diff --git a/projects/jobq/src/python/jobq/__init__.py b/projects/jobq/src/python/jobq/__init__.py new file mode 100644 index 0000000..e0ac59a --- /dev/null +++ b/projects/jobq/src/python/jobq/__init__.py @@ -0,0 +1,261 @@ +""" +A job queue library teetering atop sqlite3. +""" + +import logging +import os +import sys +import sqlite3 +import json + +import anosql +from anosql_migrations import run_migrations, with_migrations + +_GET_JOB_FIELDS = """\ + `id` +, `payload` +, `events` +, `state` +, `modified` +""" + +_SQL = f"""\ +-- name: migration-0000-create-jobq +CREATE TABLE `job` ( + `id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key +, `payload` TEXT -- JSON payload +, `events` TEXT DEFAULT '[]' -- append log of JSON events +, `state` TEXT -- JSON state of the job +, `modified` INTEGER DEFAULT CURRENT_TIMESTAMP -- last modified +-- note the `rowid` field is defaulted +); +-- name: migration-0001-index-modified +-- Enable efficient queries ordered by job modification +CREATE INDEX IF NOT EXISTS `job_modified` ON `job` ( + `modified` +); +-- name: job-create`, `<=`, `>=`, `=` + - `LIKE` + + Query ops join under `AND` + """ + + def compile_term(term): + if term is None: + return "NULL" + else: + assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"]) + return term + + def compile_op(op): + op, qexpr, val = op + assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"] + return f"{compile_term(qexpr)} {op} {compile_term(val)}" + + ops = [compile_op(op) for op in query] + return " AND ".join(ops) + + +class JobQueue(object): + + def __init__(self, path): + self._db = sqlite3.connect(path) + self._queries = anosql.from_str(_SQL, "sqlite3") + + with self._db as db: + self._queries = with_migrations("sqlite3", self._queries, db) + run_migrations(self._queries, db) + + def query(self, query, limit=None): + with self._db as db: + query = compile_query(query) + + def qf(): + cur = db.cursor() + cur.execute(_QUERY_SQL.format(query)) + yield from cur.fetchall() + cur.close() + + jobs = qf() + + if limit: + limit = int(limit) + def lf(iterable): + iterable = iter(iterable) + for i in range(limit): + try: + yield next(iterable) + except StopIteration: + break + jobs = lf(jobs) + + return list(jobs) + + def create(self, job, new_state=None): + """Create a new job on the queue, optionally specifying its state.""" + + with self._db as db: + (id, state), = self._queries.job_create( + db, + payload=json.dumps(job), + state=json.dumps(new_state), + ) + return id + + def poll(self, query, new_state): + """Query for the longest-untouched job matching, advancing it to new_state.""" + + with self._db as db: + cur = db.cursor() + cur.execute(_POLL_SQL.format(compile_query(query)), + {"state": json.dumps(new_state)}) + results = cur.fetchall() + if results: + return results + + def get(self, job_id): + """Fetch all available data about a given job by ID.""" + + with self._db as db: + return self._queries.job_get(db, id=job_id) + + def cas_state(self, job_id, old_state, new_state): + """CAS update a job's state, returning the updated job or indicating a conflict.""" + + with self._db as db: + return self._queries.job_cas_state( + db, + id=job_id, + old_state=json.dumps(old_state), + new_state=json.dumps(new_state), + ) + + def append_event(self, job_id, event): + """Append a user-defined event to the job's log.""" + + with self._db as db: + return self._queries.job_append_event( + db, + id=job_id, + event=json.dumps(event) + ) + + def delete_job(self, job_id): + """Delete a job by ID, regardless of state.""" + + with self._db as db: + return self._queries.job_delete(db, id=job_id) diff --git a/projects/jobq/src/python/jobq/__main__.py b/projects/jobq/src/python/jobq/__main__.py deleted file mode 100644 index 818fef1..0000000 --- a/projects/jobq/src/python/jobq/__main__.py +++ /dev/null @@ -1,346 +0,0 @@ -""" -A mock job queue. -""" - -import argparse -from functools import wraps -import json -import logging -import os -import sys -import sqlite3 - -import anosql -from anosql_migrations import run_migrations, with_migrations -from flask import abort, current_app, Flask, jsonify, request - - -_SQL = """\ --- name: migration-0000-create-jobq -CREATE TABLE `job` ( - `id` INTEGER PRIMARY KEY AUTOINCREMENT -- primary key -, `payload` TEXT -- JSON payload -, `events` TEXT DEFAULT '[]' -- append log of JSON events -, `state` TEXT -- JSON state of the job -); --- name: job-create`, `<=`, `>=`, `=` - - `LIKE` - - Query ops join under `AND` - """ - - def compile_term(term): - if term is None: - return "NULL" - else: - assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"]) - return term - - def compile_op(op): - op, qexpr, val = op - assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"] - return f"{compile_term(qexpr)} {op} {compile_term(val)}" - - ops = [compile_op(op) for op in query] - return " AND ".join(ops) - - -@app.route("/api/v0/job", methods=["GET", "POST"]) -def get_jobs(): - """Return jobs in the system.""" - - if request.method == "POST": - blob = request.get_json(force=True) - else: - blob = {} - - if query := blob.get("query"): - query = compile_query(query) - print(query) - def qf(): - cur = request.db.cursor() - cur.execute(f"SELECT `id`, `state` FROM `job` AS `j` WHERE {query};") - yield from cur.fetchall() - cur.close() - jobs = qf() - - else: - jobs = current_app.queries.job_list(request.db) - - if limit := blob.get("limit", 100): - limit = int(limit) - def lf(iterable): - iterable = iter(iterable) - for i in range(limit): - try: - yield next(iterable) - except StopIteration: - break - jobs = lf(jobs) - - return jsonify({ - "jobs": [ - { - "id": id, - "state": json.loads(state) if state is not None else state - } - for id, state in jobs - ] - }), 200 - - -@app.route("/api/v0/job/create", methods=["POST"]) -def create_job(): - """Create a job.""" - - blob = request.get_json(force=True) - payload = blob["payload"] - state = blob.get("state", None) - id, state = current_app.queries.job_create( - request.db, - payload=json.dumps(payload), - state=json.dumps(state), - ) - return jsonify({"id": id, "state": state}), 200 - - -@app.route("/api/v0/job/poll", methods=["POST"]) -def poll_job(): - """Using a query, attempt to poll for the next job matching criteria.""" - - blob = request.get_json(force=True) - query = compile_query(blob["query"]) - cur = request.db.cursor() - cur.execute(f"""\ - UPDATE `job` - SET - `events` = json_insert(events, '$[#]', - json_array('job_state_advanced', - json_object('old', json(state), - 'new', json(:state), - 'timestamp', strftime('%s', 'now')))) - , `state` = json(:state) - WHERE - `id` IN ( - SELECT - `id` - FROM - `job` AS `j` - WHERE - {query} - LIMIT 1 - ) - RETURNING - `id` - , `state` - ;""", {"state": json.dumps(blob["state"])}) - results = cur.fetchall() - cur.close() - if results: - (id, state), = results - return jsonify({"id": id, "state": json.loads(state)}), 200 - else: - abort(404) - - -@app.route("/api/v0/job/", methods=["GET"]) -def get_job(job_id): - """Return a job by ID.""" - - r = current_app.queries.job_get(request.db, id=job_id) - if not r: - abort(404) - - # Unpack the response tuple - id, payload, events, state = r - return jsonify({ - "id": id, - "payload": json.loads(payload), - "events": json.loads(events), - "state": json.loads(state) if state is not None else state, - }), 200 - - -@app.route("/api/v0/job//state", methods=["POST"]) -def update_state(job_id): - """CAS update a job's state, returning the updated job or indicating a conflict.""" - - document = request.get_json(force=True) - old = document["old"] - new = document["new"] - if current_app.queries.job_cas_state( - request.db, - id=job_id, - old_state=json.dumps(old), - new_state=json.dumps(new), - ): - return get_job(job_id) - else: - abort(409) - - -@app.route("/api/v0/job//event", methods=["POST"]) -def append_event(job_id): - """Append a user-defined event to the job's log.""" - - current_app.queries.job_append_event( - request.db, - id=job_id, - event=json.dumps(request.get_json(force=True)) - ) - - return get_job(job_id) - - -@app.route("/api/v0/job/", methods=["DELETE"]) -def delete_job(job_id): - """Delete a given job.""" - - current_app.queries.job_delete(request.db, id=job_id) - - return jsonify({}), 200 - -def main(): - """Run the mock server.""" - - opts, args = parser.parse_known_args() - - app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db)) - app.config["host"] = opts.host - app.config["port"] = opts.port - - app.run( - threaded=True, - ) - - -if __name__ == "__main__": - main() diff --git a/projects/jobqd/BUILD b/projects/jobqd/BUILD new file mode 100644 index 0000000..a95a684 --- /dev/null +++ b/projects/jobqd/BUILD @@ -0,0 +1,24 @@ +zapp_binary( + name = "jobqd", + main = "src/python/jobqd/__main__.py", + imports = [ + "src/python", + ], + deps = [ + "//projects/jobq", + py_requirement("flask"), + ] +) + +py_library( + name = "client", + srcs = [ + "src/python/jobq/rest/api.py", + ], + imports = [ + "src/python", + ], + deps = [ + py_requirement("requests"), + ], +) diff --git a/projects/jobqd/README.md b/projects/jobqd/README.md new file mode 100644 index 0000000..7491437 --- /dev/null +++ b/projects/jobqd/README.md @@ -0,0 +1,58 @@ +# Jobq + +Jobq is an event-oriented framework for recording _jobs_. +Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_. +Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change. +Users may manually add _events_ to the log. + +Note that, while we strongly suggest that _state_ should always be a tagged tuple, no constraints are placed on its contents. + +## HTTP API + +### GET /api/v0/queue +Enumerate all the queues in the system. + +### POST /api/v0/queue +Create a new queue of jobs. + +### DELETE /api/v0/queue/ +Expunge a queue, deleting all jobs in it regardless of state. + +### GET /api/v0/queue//job +Return an enumeration of jobs active in the system. + +### POST /api/v0/queue//query_jobs +Perform a point-in-time query for jobs. + +The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query. +Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends). +Valid exprs are any SQLite expression not containing sub-queries. + +Here, we're looking for jobs tagged as in the `["CREATED"]` state. + +### POST /api/v0/queue//poll_job +Poll for at most one job matching the given query, atomically advancing it to the given state. + +Uses the same query format as the /job endpoint. + +Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host. +Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria. + +### POST /api/v0/queue//job +Given a JSON document as the POST body, create a new job with a payload in the given state. +If state is not provided, the state `null` is used. + +### GET /api/v0/queue//job/ +Return all available data about a given job, including the payload, event log and current state. + +### DELETE /api/v0/queue//job/ +Expunge a given job from the system by ID. + +### POST /api/v0/queue//job//state +POST the 'current' state, and a proposed new state, attempting to update the state of the job using CAS. +If the state of the job updates successfully, a new event will be appended to the job's log and the resulting job will be returned. +Otherwise a conflict will be signaled. + +### POST /api/v0/queue//job//event +Append an arbitrary event to the log. +User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted. diff --git a/projects/jobqd/src/__main__.py b/projects/jobqd/src/__main__.py new file mode 100644 index 0000000..7241414 --- /dev/null +++ b/projects/jobqd/src/__main__.py @@ -0,0 +1,146 @@ +""" +A mock job queue. +""" + +import argparse +from functools import wraps +import json +import logging +import os +import sys +import sqlite3 + +from jobq import JobQueue + +from flask import abort, current_app, Flask, jsonify, request + + +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +app = Flask(__name__) + +parser = argparse.ArgumentParser() +parser.add_argument("--port", type=int, default=8080) +parser.add_argument("--host", default="localhost") +parser.add_argument("--db", default="~/jobq.sqlite3") + + +@app.before_first_request +def setup_queries(): + current_app.q = JobQueue(current_app.config["db"]) + + +@app.route("/api/v0/job", methods=["GET", "POST"]) +def get_jobs(): + """Return jobs in the system.""" + + if request.method == "POST": + blob = request.get_json(force=True) + else: + blob = {} + + query = blob.get("query", [["true"]]) + + return jsonify({ + "jobs": [ + { + "id": id, + "state": json.loads(state) if state is not None else state + } + for id, state in current_app.q.query(query) + ] + }), 200 + + +@app.route("/api/v0/job/create", methods=["POST"]) +def create_job(): + """Create a job.""" + + blob = request.get_json(force=True) + payload = blob["payload"] + state = blob.get("state", None) + id, state = current_app.q.create( + payload, state + ) + return jsonify({"id": id, "state": state}), 200 + + +@app.route("/api/v0/job/poll", methods=["POST"]) +def poll_job(): + """Using a query, attempt to poll for the next job matching criteria.""" + + blob = request.get_json(force=True) + query = blob["query"] + state = blob["state"] + results = current_app.q.poll(query, state) + if results: + (id, state), = results + return jsonify({"id": id, "state": json.loads(state)}), 200 + else: + abort(404) + + +@app.route("/api/v0/job/", methods=["GET"]) +def get_job(job_id): + """Return a job by ID.""" + + r = current_app.q.get(id=job_id) + if not r: + abort(404) + + # Unpack the response tuple + id, payload, events, state, modified = r + return jsonify({ + "id": id, + "payload": json.loads(payload), + "events": json.loads(events), + "state": json.loads(state) if state is not None else state, + "modified": modified, + }), 200 + + +@app.route("/api/v0/job//state", methods=["POST"]) +def update_state(job_id): + """CAS update a job's state, returning the updated job or indicating a conflict.""" + + document = request.get_json(force=True) + old = document["old"] + new = document["new"] + if current_app.q.cas_state(job_id, old, new): + return get_job(job_id) + else: + abort(409) + + +@app.route("/api/v0/job//event", methods=["POST"]) +def append_event(job_id): + """Append a user-defined event to the job's log.""" + + return current_app.q.append_event(job_id, event=request.get_json(force=True)) + + +@app.route("/api/v0/job/", methods=["DELETE"]) +def delete_job(job_id): + """Delete a given job.""" + + current_app.queries.job_delete(request.db, id=job_id) + + return jsonify({}), 200 + +def main(): + """Run the mock server.""" + + opts, args = parser.parse_known_args() + + app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db)) + app.config["host"] = opts.host + app.config["port"] = opts.port + + app.run( + threaded=True, + ) + + +if __name__ == "__main__": + main() diff --git a/projects/jobqd/src/python/jobq/openapi.yaml b/projects/jobqd/src/python/jobq/openapi.yaml new file mode 100644 index 0000000..15d1c4d --- /dev/null +++ b/projects/jobqd/src/python/jobq/openapi.yaml @@ -0,0 +1,126 @@ +--- +openapi: 3.0.1 +info: + title: Jobq + description: A trivial job queue API + +tags: + - name: queue + - name: job + +definitions: + parameters: + q_id: + in: path + name: q_id + required: true + description: The ID of a given queue + j_id: + in: path + name: j_id + required: true + description: The ID of a given job + + responses: + id: {} + queue: {} + queues: {} + job: {} + + jobs: + "200": + description: A list of jobs + content: + application/json: + schema: + $ref: "#/definitions/types/jobs" + + types: + queue: {} + + queues: + type: object + properties: + jobs: + type: list + items: + $ref: "#/definitions/types/queue" + + job: {} + + dehydrated_job: {} + + jobs: + type: object + properties: + jobs: + type: list + items: + $ref: "#/definitions/types/dehydrated_job" + +paths: + "/api/v0/queue": + get: + description: Fetch a list of queues in the system + responses: + + post: + description: Create a new job queue + responses: + "200": + description: Created + "409": + description: Conflict + + "/api/v0/queue/{q_id}": + get: + description: "" + parameters: + - $ref: "#/definitions/parameters/q_id" + + responses: + $ref: "#/definitions/responses/jobs" + + "/api/v0/queue/{q_id}/job": + post: + description: "Create a job within a given queue." + parameters: + - $ref: "#/definitions/parameters/q_id" + + "/api/v0/queue/{q_id}/query_jobs": + post: + description: "Query the jobs in a queue." + parameters: + - $ref: "#/definitions/parameters/q_id" + + responses: + "200": {} + + "/api/v0/queue/{q_id}/poll_job": + post: + description: "Poll zero or one jobs off the queue." + parameters: + - $ref: "#/definitions/parameters/q_id" + + "/api/v0/job/{j_id}": + get: + description: "Return all available data about the job" + parameters: + - $ref: "#/definitions/parameters/j_id" + + delete: + description: "Expunge the job" + parameters: + - $ref: "#/definitions/parameters/j_id" + + "/api/v0/job/{j_id}/state": + post: + description: "Alter the job's state, appending an event" + parameters: + - $ref: "#/definitions/parameters/j_id" + + "/api/v0/job/{j_id}/event": + post: + description: "Append an event to a given job without modifying state" + parameters: + - $ref: "#/definitions/parameters/j_d" diff --git a/projects/jobq/src/python/jobq/rest/api.py b/projects/jobqd/src/python/jobq/rest/api.py similarity index 100% rename from projects/jobq/src/python/jobq/rest/api.py rename to projects/jobqd/src/python/jobq/rest/api.py