From edf5e4d2317bf5079d76be5c1742b50b9c9f4118 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Fri, 20 Aug 2021 01:37:20 -0600 Subject: [PATCH] Overhaul client and server --- projects/jobqd/BUILD | 2 +- projects/jobqd/src/python/jobq/rest/api.py | 89 ------------------ .../jobqd/src/{ => python/jobqd}/__main__.py | 76 ++++++++------- .../src/python/{jobq => jobqd}/openapi.yaml | 72 +++++--------- projects/jobqd/src/python/jobqd/rest/api.py | 94 +++++++++++++++++++ 5 files changed, 161 insertions(+), 172 deletions(-) delete mode 100644 projects/jobqd/src/python/jobq/rest/api.py rename projects/jobqd/src/{ => python/jobqd}/__main__.py (64%) rename projects/jobqd/src/python/{jobq => jobqd}/openapi.yaml (67%) create mode 100644 projects/jobqd/src/python/jobqd/rest/api.py diff --git a/projects/jobqd/BUILD b/projects/jobqd/BUILD index a95a684..71d56e8 100644 --- a/projects/jobqd/BUILD +++ b/projects/jobqd/BUILD @@ -13,7 +13,7 @@ zapp_binary( py_library( name = "client", srcs = [ - "src/python/jobq/rest/api.py", + "src/python/jobqd/rest/api.py", ], imports = [ "src/python", diff --git a/projects/jobqd/src/python/jobq/rest/api.py b/projects/jobqd/src/python/jobq/rest/api.py deleted file mode 100644 index 45fb038..0000000 --- a/projects/jobqd/src/python/jobq/rest/api.py +++ /dev/null @@ -1,89 +0,0 @@ -"""A quick and dirty Python driver for the jobq API.""" - -import typing as t - -import requests - - -class DehydratedJob(t.NamedTuple): - """The 'important' bits of a given job.""" - - id: int - url: str - state: object - - -class HydratedJob(t.NamedTuple): - """The full state of a given job.""" - - id: int - url: str - state: object - payload: object - events: object - - -Job = t.Union[DehydratedJob, HydratedJob] - - -class JobqClient(object): - def __init__(self, url, session=None): - self._url = url - self._session = session or requests.Session() - - def _job(self, json): - return DehydratedJob(id=json["id"], - url=f"{self._url}/api/v0/job/{json.get('id')}", - state=json["state"]) - - def jobs(self, query=None, limit=10) -> t.Iterable[DehydratedJob]: - """Enumerate jobs on the queue.""" - - for job_frag in self._session.post(self._url + "/api/v0/job", - json={"query": query or [], - "limit": limit})\ - .json()\ - .get("jobs"): - yield self._job(job_frag) - - def poll(self, query, state) -> DehydratedJob: - """Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job.""" - - return self._job(self._session.post(self._url + "/api/v0/job/poll", - json={"query": query, - "state": state}).json()) - - def create(self, payload: object, state=None) -> DehydratedJob: - """Create a new job in the system.""" - - job_frag = self._session.post(self._url + "/api/v0/job/create", - json={"payload": payload, - "state": state})\ - .json() - return self._job(job_frag) - - def fetch(self, job: Job) -> HydratedJob: - """Fetch the current state of a job.""" - - return HydratedJob(url=job.url, **self._session.get(job.url).json()) - - def advance(self, job: Job, state: object) -> Job: - """Attempt to advance a job to a subsequent state.""" - - return HydratedJob(url=job.url, - **self._session.post(job.url + "/state", - json={"old": job.state, - "new": state}).json()) - - def event(self, job: Job, event: object) -> HydratedJob: - """Attempt to record an event against a job.""" - - return HydratedJob(url=job.url, - **self._session.post(job.url + "/event", - json=event).json()) - - def delete(self, job: Job) -> None: - """Delete a remote job.""" - - return self._session.delete(job.url)\ - .raise_for_status() diff --git a/projects/jobqd/src/__main__.py b/projects/jobqd/src/python/jobqd/__main__.py similarity index 64% rename from projects/jobqd/src/__main__.py rename to projects/jobqd/src/python/jobqd/__main__.py index 7241414..57e2d15 100644 --- a/projects/jobqd/src/__main__.py +++ b/projects/jobqd/src/python/jobqd/__main__.py @@ -1,5 +1,5 @@ """ -A mock job queue. +A job queue over HTTP. """ import argparse @@ -10,7 +10,7 @@ import os import sys import sqlite3 -from jobq import JobQueue +from jobq import Job, JobQueue from flask import abort, current_app, Flask, jsonify, request @@ -26,9 +26,24 @@ parser.add_argument("--host", default="localhost") parser.add_argument("--db", default="~/jobq.sqlite3") -@app.before_first_request -def setup_queries(): - current_app.q = JobQueue(current_app.config["db"]) +@app.before_request +def setup_q(): + request.q = JobQueue(current_app.config["db"]) + + +@app.after_request +def teardown_q(): + request.q.close() + + +def job_as_json(job: Job) -> dict: + return { + "id": job.id, + "payload": job.payload, + "events": job.events, + "state": job.state, + "modified": int(job.modified), + } @app.route("/api/v0/job", methods=["GET", "POST"]) @@ -40,16 +55,10 @@ def get_jobs(): else: blob = {} - query = blob.get("query", [["true"]]) + query = blob.get("query", "true") return jsonify({ - "jobs": [ - { - "id": id, - "state": json.loads(state) if state is not None else state - } - for id, state in current_app.q.query(query) - ] + "jobs": [job_as_json(j) for j in request.q.query(query)] }), 200 @@ -60,10 +69,10 @@ def create_job(): blob = request.get_json(force=True) payload = blob["payload"] state = blob.get("state", None) - id, state = current_app.q.create( + job = request.q.create( payload, state ) - return jsonify({"id": id, "state": state}), 200 + return jsonify(job_as_json(job)), 200 @app.route("/api/v0/job/poll", methods=["POST"]) @@ -73,10 +82,9 @@ def poll_job(): blob = request.get_json(force=True) query = blob["query"] state = blob["state"] - results = current_app.q.poll(query, state) - if results: - (id, state), = results - return jsonify({"id": id, "state": json.loads(state)}), 200 + r = request.q.poll(query, state) + if r: + return jsonify(job_as_json(r)), 200 else: abort(404) @@ -85,20 +93,12 @@ def poll_job(): def get_job(job_id): """Return a job by ID.""" - r = current_app.q.get(id=job_id) - if not r: + r = request.q.get(id=job_id) + if r: + return jsonify(job_as_json(r)), 200 + else: abort(404) - # Unpack the response tuple - id, payload, events, state, modified = r - return jsonify({ - "id": id, - "payload": json.loads(payload), - "events": json.loads(events), - "state": json.loads(state) if state is not None else state, - "modified": modified, - }), 200 - @app.route("/api/v0/job//state", methods=["POST"]) def update_state(job_id): @@ -107,8 +107,9 @@ def update_state(job_id): document = request.get_json(force=True) old = document["old"] new = document["new"] - if current_app.q.cas_state(job_id, old, new): - return get_job(job_id) + r = request.q.cas_state(job_id, old, new) + if r: + return jsonify(job_as_json(r)), 200 else: abort(409) @@ -117,17 +118,22 @@ def update_state(job_id): def append_event(job_id): """Append a user-defined event to the job's log.""" - return current_app.q.append_event(job_id, event=request.get_json(force=True)) + r = request.q.append_event(job_id, event=request.get_json(force=True)) + if r: + return jsonify(job_as_json(r)), 200 + else: + abort(404) @app.route("/api/v0/job/", methods=["DELETE"]) def delete_job(job_id): """Delete a given job.""" - current_app.queries.job_delete(request.db, id=job_id) + request.q.job_delete(request.db, id=job_id) return jsonify({}), 200 + def main(): """Run the mock server.""" diff --git a/projects/jobqd/src/python/jobq/openapi.yaml b/projects/jobqd/src/python/jobqd/openapi.yaml similarity index 67% rename from projects/jobqd/src/python/jobq/openapi.yaml rename to projects/jobqd/src/python/jobqd/openapi.yaml index 15d1c4d..3000a6d 100644 --- a/projects/jobqd/src/python/jobq/openapi.yaml +++ b/projects/jobqd/src/python/jobqd/openapi.yaml @@ -15,16 +15,18 @@ definitions: name: q_id required: true description: The ID of a given queue + schema: + $ref: "#/definitions/types/id" + j_id: in: path name: j_id required: true description: The ID of a given job + schema: + $ref: "#/definitions/types/id" responses: - id: {} - queue: {} - queues: {} job: {} jobs: @@ -36,58 +38,27 @@ definitions: $ref: "#/definitions/types/jobs" types: - queue: {} + id: + type: int - queues: + job: type: object properties: - jobs: - type: list - items: - $ref: "#/definitions/types/queue" - - job: {} - - dehydrated_job: {} - - jobs: - type: object - properties: - jobs: - type: list - items: - $ref: "#/definitions/types/dehydrated_job" + id: + $ref: "#/definitions/types/id" + payload: {} + events: {} + state: {} + modified: + type: int paths: - "/api/v0/queue": + "/api/v0/job": get: - description: Fetch a list of queues in the system - responses: - - post: - description: Create a new job queue - responses: - "200": - description: Created - "409": - description: Conflict - - "/api/v0/queue/{q_id}": - get: - description: "" + description: "Query the jobs in a queue." parameters: - $ref: "#/definitions/parameters/q_id" - responses: - $ref: "#/definitions/responses/jobs" - - "/api/v0/queue/{q_id}/job": - post: - description: "Create a job within a given queue." - parameters: - - $ref: "#/definitions/parameters/q_id" - - "/api/v0/queue/{q_id}/query_jobs": post: description: "Query the jobs in a queue." parameters: @@ -96,7 +67,14 @@ paths: responses: "200": {} - "/api/v0/queue/{q_id}/poll_job": + "/api/v0/job/create": + post: + description: "Create a job within a given queue." + parameters: + - $ref: "#/definitions/parameters/q_id" + + + "/api/v0/job/poll": post: description: "Poll zero or one jobs off the queue." parameters: diff --git a/projects/jobqd/src/python/jobqd/rest/api.py b/projects/jobqd/src/python/jobqd/rest/api.py new file mode 100644 index 0000000..e7f70f5 --- /dev/null +++ b/projects/jobqd/src/python/jobqd/rest/api.py @@ -0,0 +1,94 @@ +"""A quick and dirty Python driver for the jobqd API.""" + +from datetime import datetime +from typing import NamedTuple + +import requests + + +class Job(NamedTuple): + id: int + payload: object + events: object + state: object + modified: datetime + + @classmethod + def from_json(cls, obj): + return cls( + id = int(obj["id"]), + payload = obj["payload"], + events = obj["events"], + state = obj["state"], + modified = datetime.fromtimestamp(obj["modified"]) + ) + + +class JobqClient(object): + def __init__(self, url, session=None): + self._url = url + self._session = session or requests.Session() + + def jobs(self, query=None, limit=10) -> t.Iterable[Job]: + """Enumerate jobs on the queue.""" + + for job in self._session.post(self._url + "/api/v0/job", + json={"query": query or [], + "limit": limit})\ + .json()\ + .get("jobs"): + yield Job.from_json(job) + + def poll(self, query, state) -> DehydratedJob: + """Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job.""" + + return Job.from_json( + self._session + .post(self._url + "/api/v0/job/poll", + json={"query": query, + "state": state}) + .json()) + + def create(self, payload: object, state=None) -> DehydratedJob: + """Create a new job in the system.""" + + return Job.from_json( + self._session + .post(self._url + "/api/v0/job/create", + json={"payload": payload, + "state": state}) + .json()) + + def fetch(self, job: Job) -> HydratedJob: + """Fetch the current state of a job.""" + + return Job.from_json( + self._session + .get(self._url + "/api/v0/job/" + job.id) + .json()) + + def advance(self, job: Job, state: object) -> Job: + """Attempt to advance a job to a subsequent state.""" + + return Job.from_json( + self._session + .post(job.url + "/state", + json={"old": job.state, + "new": state}) + .json()) + + def event(self, job: Job, event: object) -> HydratedJob: + """Attempt to record an event against a job.""" + + return Job.from_json( + self._session + .post(self._url + f"/api/v0/job/{job.id}/event", + json=event) + .json()) + + def delete(self, job: Job) -> None: + """Delete a remote job.""" + + return (self._session + .delete(self._url + f"/api/v0/job/{job.id}") + .raise_for_status())