Overhaul client and server

This commit is contained in:
Reid 'arrdem' McKenzie 2021-08-20 01:37:20 -06:00
parent a57ebeb524
commit edf5e4d231
5 changed files with 161 additions and 172 deletions

View file

@ -13,7 +13,7 @@ zapp_binary(
py_library( py_library(
name = "client", name = "client",
srcs = [ srcs = [
"src/python/jobq/rest/api.py", "src/python/jobqd/rest/api.py",
], ],
imports = [ imports = [
"src/python", "src/python",

View file

@ -1,89 +0,0 @@
"""A quick and dirty Python driver for the jobq API."""
import typing as t
import requests
class DehydratedJob(t.NamedTuple):
"""The 'important' bits of a given job."""
id: int
url: str
state: object
class HydratedJob(t.NamedTuple):
"""The full state of a given job."""
id: int
url: str
state: object
payload: object
events: object
Job = t.Union[DehydratedJob, HydratedJob]
class JobqClient(object):
def __init__(self, url, session=None):
self._url = url
self._session = session or requests.Session()
def _job(self, json):
return DehydratedJob(id=json["id"],
url=f"{self._url}/api/v0/job/{json.get('id')}",
state=json["state"])
def jobs(self, query=None, limit=10) -> t.Iterable[DehydratedJob]:
"""Enumerate jobs on the queue."""
for job_frag in self._session.post(self._url + "/api/v0/job",
json={"query": query or [],
"limit": limit})\
.json()\
.get("jobs"):
yield self._job(job_frag)
def poll(self, query, state) -> DehydratedJob:
"""Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job."""
return self._job(self._session.post(self._url + "/api/v0/job/poll",
json={"query": query,
"state": state}).json())
def create(self, payload: object, state=None) -> DehydratedJob:
"""Create a new job in the system."""
job_frag = self._session.post(self._url + "/api/v0/job/create",
json={"payload": payload,
"state": state})\
.json()
return self._job(job_frag)
def fetch(self, job: Job) -> HydratedJob:
"""Fetch the current state of a job."""
return HydratedJob(url=job.url, **self._session.get(job.url).json())
def advance(self, job: Job, state: object) -> Job:
"""Attempt to advance a job to a subsequent state."""
return HydratedJob(url=job.url,
**self._session.post(job.url + "/state",
json={"old": job.state,
"new": state}).json())
def event(self, job: Job, event: object) -> HydratedJob:
"""Attempt to record an event against a job."""
return HydratedJob(url=job.url,
**self._session.post(job.url + "/event",
json=event).json())
def delete(self, job: Job) -> None:
"""Delete a remote job."""
return self._session.delete(job.url)\
.raise_for_status()

View file

@ -1,5 +1,5 @@
""" """
A mock job queue. A job queue over HTTP.
""" """
import argparse import argparse
@ -10,7 +10,7 @@ import os
import sys import sys
import sqlite3 import sqlite3
from jobq import JobQueue from jobq import Job, JobQueue
from flask import abort, current_app, Flask, jsonify, request from flask import abort, current_app, Flask, jsonify, request
@ -26,9 +26,24 @@ parser.add_argument("--host", default="localhost")
parser.add_argument("--db", default="~/jobq.sqlite3") parser.add_argument("--db", default="~/jobq.sqlite3")
@app.before_first_request @app.before_request
def setup_queries(): def setup_q():
current_app.q = JobQueue(current_app.config["db"]) request.q = JobQueue(current_app.config["db"])
@app.after_request
def teardown_q():
request.q.close()
def job_as_json(job: Job) -> dict:
return {
"id": job.id,
"payload": job.payload,
"events": job.events,
"state": job.state,
"modified": int(job.modified),
}
@app.route("/api/v0/job", methods=["GET", "POST"]) @app.route("/api/v0/job", methods=["GET", "POST"])
@ -40,16 +55,10 @@ def get_jobs():
else: else:
blob = {} blob = {}
query = blob.get("query", [["true"]]) query = blob.get("query", "true")
return jsonify({ return jsonify({
"jobs": [ "jobs": [job_as_json(j) for j in request.q.query(query)]
{
"id": id,
"state": json.loads(state) if state is not None else state
}
for id, state in current_app.q.query(query)
]
}), 200 }), 200
@ -60,10 +69,10 @@ def create_job():
blob = request.get_json(force=True) blob = request.get_json(force=True)
payload = blob["payload"] payload = blob["payload"]
state = blob.get("state", None) state = blob.get("state", None)
id, state = current_app.q.create( job = request.q.create(
payload, state payload, state
) )
return jsonify({"id": id, "state": state}), 200 return jsonify(job_as_json(job)), 200
@app.route("/api/v0/job/poll", methods=["POST"]) @app.route("/api/v0/job/poll", methods=["POST"])
@ -73,10 +82,9 @@ def poll_job():
blob = request.get_json(force=True) blob = request.get_json(force=True)
query = blob["query"] query = blob["query"]
state = blob["state"] state = blob["state"]
results = current_app.q.poll(query, state) r = request.q.poll(query, state)
if results: if r:
(id, state), = results return jsonify(job_as_json(r)), 200
return jsonify({"id": id, "state": json.loads(state)}), 200
else: else:
abort(404) abort(404)
@ -85,20 +93,12 @@ def poll_job():
def get_job(job_id): def get_job(job_id):
"""Return a job by ID.""" """Return a job by ID."""
r = current_app.q.get(id=job_id) r = request.q.get(id=job_id)
if not r: if r:
return jsonify(job_as_json(r)), 200
else:
abort(404) abort(404)
# Unpack the response tuple
id, payload, events, state, modified = r
return jsonify({
"id": id,
"payload": json.loads(payload),
"events": json.loads(events),
"state": json.loads(state) if state is not None else state,
"modified": modified,
}), 200
@app.route("/api/v0/job/<job_id>/state", methods=["POST"]) @app.route("/api/v0/job/<job_id>/state", methods=["POST"])
def update_state(job_id): def update_state(job_id):
@ -107,8 +107,9 @@ def update_state(job_id):
document = request.get_json(force=True) document = request.get_json(force=True)
old = document["old"] old = document["old"]
new = document["new"] new = document["new"]
if current_app.q.cas_state(job_id, old, new): r = request.q.cas_state(job_id, old, new)
return get_job(job_id) if r:
return jsonify(job_as_json(r)), 200
else: else:
abort(409) abort(409)
@ -117,17 +118,22 @@ def update_state(job_id):
def append_event(job_id): def append_event(job_id):
"""Append a user-defined event to the job's log.""" """Append a user-defined event to the job's log."""
return current_app.q.append_event(job_id, event=request.get_json(force=True)) r = request.q.append_event(job_id, event=request.get_json(force=True))
if r:
return jsonify(job_as_json(r)), 200
else:
abort(404)
@app.route("/api/v0/job/<job_id>", methods=["DELETE"]) @app.route("/api/v0/job/<job_id>", methods=["DELETE"])
def delete_job(job_id): def delete_job(job_id):
"""Delete a given job.""" """Delete a given job."""
current_app.queries.job_delete(request.db, id=job_id) request.q.job_delete(request.db, id=job_id)
return jsonify({}), 200 return jsonify({}), 200
def main(): def main():
"""Run the mock server.""" """Run the mock server."""

View file

@ -15,16 +15,18 @@ definitions:
name: q_id name: q_id
required: true required: true
description: The ID of a given queue description: The ID of a given queue
schema:
$ref: "#/definitions/types/id"
j_id: j_id:
in: path in: path
name: j_id name: j_id
required: true required: true
description: The ID of a given job description: The ID of a given job
schema:
$ref: "#/definitions/types/id"
responses: responses:
id: {}
queue: {}
queues: {}
job: {} job: {}
jobs: jobs:
@ -36,58 +38,27 @@ definitions:
$ref: "#/definitions/types/jobs" $ref: "#/definitions/types/jobs"
types: types:
queue: {} id:
type: int
queues: job:
type: object type: object
properties: properties:
jobs: id:
type: list $ref: "#/definitions/types/id"
items: payload: {}
$ref: "#/definitions/types/queue" events: {}
state: {}
job: {} modified:
type: int
dehydrated_job: {}
jobs:
type: object
properties:
jobs:
type: list
items:
$ref: "#/definitions/types/dehydrated_job"
paths: paths:
"/api/v0/queue": "/api/v0/job":
get: get:
description: Fetch a list of queues in the system description: "Query the jobs in a queue."
responses:
post:
description: Create a new job queue
responses:
"200":
description: Created
"409":
description: Conflict
"/api/v0/queue/{q_id}":
get:
description: ""
parameters: parameters:
- $ref: "#/definitions/parameters/q_id" - $ref: "#/definitions/parameters/q_id"
responses:
$ref: "#/definitions/responses/jobs"
"/api/v0/queue/{q_id}/job":
post:
description: "Create a job within a given queue."
parameters:
- $ref: "#/definitions/parameters/q_id"
"/api/v0/queue/{q_id}/query_jobs":
post: post:
description: "Query the jobs in a queue." description: "Query the jobs in a queue."
parameters: parameters:
@ -96,7 +67,14 @@ paths:
responses: responses:
"200": {} "200": {}
"/api/v0/queue/{q_id}/poll_job": "/api/v0/job/create":
post:
description: "Create a job within a given queue."
parameters:
- $ref: "#/definitions/parameters/q_id"
"/api/v0/job/poll":
post: post:
description: "Poll zero or one jobs off the queue." description: "Poll zero or one jobs off the queue."
parameters: parameters:

View file

@ -0,0 +1,94 @@
"""A quick and dirty Python driver for the jobqd API."""
from datetime import datetime
from typing import NamedTuple
import requests
class Job(NamedTuple):
id: int
payload: object
events: object
state: object
modified: datetime
@classmethod
def from_json(cls, obj):
return cls(
id = int(obj["id"]),
payload = obj["payload"],
events = obj["events"],
state = obj["state"],
modified = datetime.fromtimestamp(obj["modified"])
)
class JobqClient(object):
def __init__(self, url, session=None):
self._url = url
self._session = session or requests.Session()
def jobs(self, query=None, limit=10) -> t.Iterable[Job]:
"""Enumerate jobs on the queue."""
for job in self._session.post(self._url + "/api/v0/job",
json={"query": query or [],
"limit": limit})\
.json()\
.get("jobs"):
yield Job.from_json(job)
def poll(self, query, state) -> DehydratedJob:
"""Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job."""
return Job.from_json(
self._session
.post(self._url + "/api/v0/job/poll",
json={"query": query,
"state": state})
.json())
def create(self, payload: object, state=None) -> DehydratedJob:
"""Create a new job in the system."""
return Job.from_json(
self._session
.post(self._url + "/api/v0/job/create",
json={"payload": payload,
"state": state})
.json())
def fetch(self, job: Job) -> HydratedJob:
"""Fetch the current state of a job."""
return Job.from_json(
self._session
.get(self._url + "/api/v0/job/" + job.id)
.json())
def advance(self, job: Job, state: object) -> Job:
"""Attempt to advance a job to a subsequent state."""
return Job.from_json(
self._session
.post(job.url + "/state",
json={"old": job.state,
"new": state})
.json())
def event(self, job: Job, event: object) -> HydratedJob:
"""Attempt to record an event against a job."""
return Job.from_json(
self._session
.post(self._url + f"/api/v0/job/{job.id}/event",
json=event)
.json())
def delete(self, job: Job) -> None:
"""Delete a remote job."""
return (self._session
.delete(self._url + f"/api/v0/job/{job.id}")
.raise_for_status())