From d27e7e6df6e56797d2b74c11ea42d8c969945dc8 Mon Sep 17 00:00:00 2001
From: Reid 'arrdem' McKenzie <me@arrdem.com>
Date: Sat, 14 Aug 2021 00:31:07 -0600
Subject: [PATCH] An initial crack at a jobq

---
 .../src/python/anosql/adapters/sqlite3.py     |  27 +-
 projects/jobq/BUILD                           |  26 ++
 projects/jobq/README.md                       | 150 ++++++++
 projects/jobq/src/python/jobq/__main__.py     | 343 ++++++++++++++++++
 projects/jobq/src/python/jobq/rest/api.py     |  88 +++++
 5 files changed, 628 insertions(+), 6 deletions(-)
 create mode 100644 projects/jobq/BUILD
 create mode 100644 projects/jobq/README.md
 create mode 100644 projects/jobq/src/python/jobq/__main__.py
 create mode 100644 projects/jobq/src/python/jobq/rest/api.py

diff --git a/projects/anosql/src/python/anosql/adapters/sqlite3.py b/projects/anosql/src/python/anosql/adapters/sqlite3.py
index db2dbd2..cd926a0 100644
--- a/projects/anosql/src/python/anosql/adapters/sqlite3.py
+++ b/projects/anosql/src/python/anosql/adapters/sqlite3.py
@@ -1,4 +1,12 @@
+"""
+A driver object implementing support for SQLite3
+"""
+
 from contextlib import contextmanager
+import logging
+import sqlite3
+
+log = logging.getLogger(__name__)
 
 
 class SQLite3DriverAdapter(object):
@@ -21,6 +29,7 @@ class SQLite3DriverAdapter(object):
     @staticmethod
     def select(conn, _query_name, sql, parameters):
         cur = conn.cursor()
+        log.debug({'sql': sql, 'parameters': parameters})
         cur.execute(sql, parameters)
         results = cur.fetchall()
         cur.close()
@@ -28,8 +37,9 @@ class SQLite3DriverAdapter(object):
 
     @staticmethod
     @contextmanager
-    def select_cursor(conn, _query_name, sql, parameters):
+    def select_cursor(conn: sqlite3.Connection, _query_name, sql, parameters):
         cur = conn.cursor()
+        log.debug({'sql': sql, 'parameters': parameters})
         cur.execute(sql, parameters)
         try:
             yield cur
@@ -37,21 +47,26 @@ class SQLite3DriverAdapter(object):
             cur.close()
 
     @staticmethod
-    def insert_update_delete(conn, _query_name, sql, parameters):
+    def insert_update_delete(conn: sqlite3.Connection, _query_name, sql, parameters):
+        log.debug({'sql': sql, 'parameters': parameters})
         conn.execute(sql, parameters)
 
     @staticmethod
-    def insert_update_delete_many(conn, _query_name, sql, parameters):
+    def insert_update_delete_many(conn: sqlite3.Connection, _query_name, sql, parameters):
+        log.debug({'sql': sql, 'parameters': parameters})
         conn.executemany(sql, parameters)
 
     @staticmethod
-    def insert_returning(conn, _query_name, sql, parameters):
+    def insert_returning(conn: sqlite3.Connection, _query_name, sql, parameters):
         cur = conn.cursor()
+        log.debug({'sql': sql, 'parameters': parameters})
         cur.execute(sql, parameters)
-        results = cur.lastrowid
+        results = cur.fetchall()
+        log.debug({"results": results})
         cur.close()
         return results
 
     @staticmethod
-    def execute_script(conn, sql):
+    def execute_script(conn: sqlite3.Connection, sql):
+        log.debug({'sql': sql, 'parameters': None})
         conn.executescript(sql)
diff --git a/projects/jobq/BUILD b/projects/jobq/BUILD
new file mode 100644
index 0000000..70cc7dc
--- /dev/null
+++ b/projects/jobq/BUILD
@@ -0,0 +1,26 @@
+zapp_binary(
+    name = "jobq",
+    main = "src/python/jobq/__main__.py",
+    imports = [
+        "src/python",
+    ],
+    deps = [
+        "//projects/anosql",
+        "//projects/anosql-migrations",
+        py_requirement("flask"),
+        py_requirement("pyyaml"),
+    ]
+)
+
+py_library(
+    name = "client",
+    srcs = [
+        "src/python/jobq/rest/api.py",
+    ],
+    imports = [
+        "src/python",
+    ],
+    deps = [
+        py_requirement("requests"),
+    ],
+)
diff --git a/projects/jobq/README.md b/projects/jobq/README.md
new file mode 100644
index 0000000..163fecf
--- /dev/null
+++ b/projects/jobq/README.md
@@ -0,0 +1,150 @@
+# Jobq
+
+Jobq is an event-oriented framework for recording _jobs_.
+Each _job_ (a JSON request blob POSTed by the user) has an attached log of _events_, and a _state_.
+Users may change the _state_ of a job, and doing so automatically produces a new _event_ recording the change.
+Users may manually add _events_ to the log.
+
+Note that, while we strongly suggest that _state_ should always be a tagged tuple and the default is `["CREATED"]`, no constraints are placed on its contents.
+
+## HTTP API
+
+### GET /api/v0/job
+Return an enumeration of jobs active in the system.
+
+**Response** -
+```shell
+$ curl -X POST $JOBQ/api/v0/job | jq .
+{"jobs": [
+  {"id": 1, "state": ["CREATED"]},
+  {"id": 2, "state": ["ASSIGNED"]},
+  {"id": 3, "state": ["SLEEPING"]},
+  {"id": 3, "state": ["FINISHED"]}
+]}
+```
+
+### POST /api/v0/job
+Perform a point-in-time query for jobs.
+
+The query is a list of `[OP, EXPR, EXPR]` triples, which are combined under `AND` to produce a server-side query.
+Valid ops are `IS`, `LIKE` and binary comparisons (`<`, `=` and friends).
+Valid ops any SQLite expression not containing sub-queries.
+
+Here, we're looking for jobs tagged as in the `["CREATED"]` state.
+``` shell
+$ curl -X POST $JOBQ/api/v0/job --data '{"query": [["IS", "json_extract(state, '$[0]')", "CREATED"]]}' | jq .
+{"jobs": [
+  {"id": 1, "state": ["CREATED"]},
+]}
+```
+
+### POST /api/v0/job/create
+Given a JSON document as the POST body, create a new job.
+
+```
+$ curl -X POST $JOBQ/api/v0/job/create --data '{"msg": "Hello, world!"}' | jq .
+{
+  "id": 1
+}
+```
+
+### POST /api/v0/job/poll
+Poll for at most one job matching the given query, atomically advancing it to the given state.
+
+Uses the same query format as the /job endpoint.
+
+Here, we're polling for hosts which are in the null (initial) state, and assigning the first such job to this host.
+Note that this assignment strategy is likely unsound as it lacks a time-to-live or other validity criteria.
+
+``` shell
+$ curl -X POST $JOBQ/api/v0/job/poll --data '{"query": [["IS", "j.state", null]], "state":["ASSIGNED", {"node": 1}]}' | jq .
+{
+  "id": 3,
+  "state": [
+    "ASSIGNED",
+    {
+      "node": 1
+    }
+  ]
+}
+```
+
+### GET /api/v0/job/<job_id>
+Return all available data about a given job, including the payload, event log and current state.
+
+```shell
+$ curl -X GET $JOBQ/api/v0/job/1 | jq .
+{
+  "id": 1,
+  "payload": {
+    "msg": "Hello, world"
+  },
+  "state": [
+    "CREATED"
+  ],
+  "events": [
+    [
+      "job_created",
+      {
+        "timestamp": "1628909303"
+      }
+    ]
+  ]
+}
+```
+
+### POST /api/v0/job/<job_id>/state
+Alter the state of a given job, appending a state change event to the log and returning the entire updated job.
+
+``` shell
+$ curl -X POST $JOBQ/api/v0/job/1/state --data '["ASSIGNED"]' | jq .
+{
+  "id": 1,
+  "payload": {
+    "msg": "Hello, world"
+  },
+  "state": [
+    "ASSIGNED"
+  ],
+  "events": [
+    [
+      "job_created",
+      {
+        "timestamp": "1628911153"
+      }
+    ],
+    [
+      "job_state_advanced",
+      {
+        "new": [
+          "ASSIGNED"
+        ],
+        "old": [
+          "CREATED"
+        ],
+        "timestamp": "1628911184"
+      }
+    ]
+  ]
+}
+```
+
+### POST /api/v0/job/<job_id>/event
+Append an arbitrary event to the log.
+User-defined events will be coded in a `"user_event"` tag, and have `"timestamp"` metadata inserted.
+
+``` shell
+$ curl -X POST $JOBQ/api/v0/job/1/event --data '["my cool event"]' | jq .events[-1]
+[
+  "user_event",
+  {
+    "event": [
+      "my cool event"
+    ],
+    "timestamp": "1628911503"
+  }
+]
+```
+
+### DELETE /api/v0/job/<job_id>
+Expunge a given job from the system by ID.
diff --git a/projects/jobq/src/python/jobq/__main__.py b/projects/jobq/src/python/jobq/__main__.py
new file mode 100644
index 0000000..1cfe090
--- /dev/null
+++ b/projects/jobq/src/python/jobq/__main__.py
@@ -0,0 +1,343 @@
+"""
+A mock job queue.
+"""
+
+import argparse
+from functools import wraps
+import json
+import logging
+import os
+import sys
+import sqlite3
+
+import anosql
+from anosql_migrations import run_migrations, with_migrations
+from flask import abort, current_app, Flask, jsonify, request
+
+
+_SQL = """\
+-- name: migration-0000-create-jobq
+CREATE TABLE `job` (
+   `id` INTEGER PRIMARY KEY AUTOINCREMENT  -- primary key
+,  `payload` TEXT                          -- JSON payload
+,  `events` TEXT DEFAULT '[]'              -- append log of JSON events
+,  `state` TEXT                            -- JSON state of the job
+);
+-- name: job-create<!
+INSERT INTO `job` (
+    `payload`
+,   `state`
+,   `events`
+) VALUES (
+    :payload
+,   :state
+,   json_array(json_array('job_created',
+                          json_object('timestamp', strftime('%s', 'now'))))
+)
+RETURNING
+    `id`
+,   `state`
+;
+-- name: job-get?
+SELECT
+    `id`
+,   `payload`
+,   `events`
+,   `state`
+FROM `job`
+WHERE
+    `id` = :id
+;
+-- name: job-delete!
+DELETE FROM `job`
+WHERE
+    `id` = :id
+;
+-- name: job-list
+SELECT
+    `id`
+,   `state`
+FROM `job`
+ORDER BY
+    `id` ASC
+;
+-- name: job-filter-state
+SELECT
+    `id`
+,   `state`
+FROM `job`
+WHERE `state` = :state
+;
+-- name: job-append-event!
+UPDATE
+    `job`
+SET
+    `events` = json_insert(events, '$[#]',
+                           json_array('user_event',
+                                      json_object('event', json(:event),
+                                                  'timestamp', strftime('%s', 'now'))))
+WHERE
+    `id` = :id
+;
+-- name: job-advance-state!
+UPDATE
+    `job`
+SET
+    `events` = json_insert(events, '$[#]',
+                           json_array('job_state_advanced',
+                                      json_object('old', json(state),
+                                                  'new', json(:state),
+                                                  'timestamp', strftime('%s', 'now'))))
+,   `state` = json(:state)
+WHERE
+    `id` = :id
+;
+-- name: job-cas-state<!
+UPDATE
+   `job`
+SET
+    `events` = json_insert(events, '$[#]',
+                           json_array('job_state_advanced',
+                                      json_object('old', json(:old_state),
+                                                  'new', json(:new_state),
+                                                  'timestamp', strftime('%s', 'now'))))
+,   `state` = json(:new_state)
+WHERE
+    `id` = :id
+AND `state` = json(:old_state)
+RETURNING
+    `id`
+;
+"""
+
+log = logging.getLogger(__name__)
+logging.basicConfig(level=logging.DEBUG)
+
+app = Flask(__name__)
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--port", type=int, default=8080)
+parser.add_argument("--host", default="localhost")
+parser.add_argument("--db", default="~/jobq.sqlite3")
+
+
+@app.before_first_request
+def setup_queries():
+    # Load the queries
+    queries = anosql.from_str(_SQL, "sqlite3")
+
+    # Do migrations
+    with sqlite3.connect(current_app.config["db"]) as db:
+        queries = with_migrations("sqlite3", queries, db)
+        run_migrations(queries, db)
+
+    current_app.queries = queries
+
+
+@app.before_request
+def before_request():
+    request.db = sqlite3.connect(current_app.config["db"])
+    request.db.set_trace_callback(logging.getLogger("sqlite3").debug)
+
+
+@app.teardown_request
+def teardown_request(exception):
+    request.db.commit()
+    request.db.close()
+
+
+def compile_query(query):
+    """Compile a query to a SELECT over jobs.
+
+    The query is a sequence of ordered pairs [op, path, val].
+    Supported ops are:
+     - `<`, `>`, `<=`, `>=`, `=`
+     - `LIKE`
+
+    Query ops join under `AND`
+    """
+
+    def compile_term(term):
+        if term is None:
+            return "NULL"
+        else:
+            assert not any(keyword in term.lower() for keyword in ["select", "update", "delete", ";"])
+            return term
+
+    def compile_op(op):
+        op, qexpr, val = op
+        assert op in ["<", "<=", "=", "!=", ">=", ">", "LIKE", "IS"]
+        return f"{compile_term(qexpr)} {op} {compile_term(val)}"
+
+    ops = [compile_op(op) for op in query]
+    return " AND ".join(ops)
+
+
+@app.route("/api/v0/job", methods=["GET", "POST"])
+def get_jobs():
+    """Return jobs in the system."""
+
+    if request.method == "POST":
+        blob = request.get_json(force=True)
+    else:
+        blob = {}
+
+    if query := blob.get("query"):
+        query = compile_query(query)
+        print(query)
+        def qf():
+            cur = request.db.cursor()
+            cur.execute(f"SELECT `id`, `state` FROM `job` AS `j` WHERE {query};")
+            yield from cur.fetchall()
+            cur.close()
+        jobs = qf()
+
+    else:
+        jobs = current_app.queries.job_list(request.db)
+
+    if limit := blob.get("limit", 100):
+        limit = int(limit)
+        def lf(iterable):
+            iterable = iter(iterable)
+            for i in range(limit):
+                try:
+                    yield next(iterable)
+                except StopIteration:
+                    break
+        jobs = lf(jobs)
+
+    return jsonify({
+        "jobs": [
+            {
+                "id": id,
+                "state": json.loads(state) if state is not None else state
+            }
+            for id, state in jobs
+        ]
+    }), 200
+
+
+@app.route("/api/v0/job/create", methods=["POST"])
+def create_job():
+    """Create a job."""
+
+    id, state = current_app.queries.job_create(
+        request.db,
+        payload=json.dumps(request.get_json(force=True)),
+        state=None,
+    )
+    return jsonify({"id": id, "state": state}), 200
+
+
+@app.route("/api/v0/job/poll", methods=["POST"])
+def poll_job():
+    """Using a query, attempt to poll for the next job matching criteria."""
+
+    blob = request.get_json(force=True)
+    query = compile_query(blob["query"])
+    cur = request.db.cursor()
+    cur.execute(f"""\
+    UPDATE `job`
+    SET
+        `events` = json_insert(events, '$[#]',
+                               json_array('job_state_advanced',
+                                          json_object('old', json(state),
+                                                      'new', json(:state),
+                                                      'timestamp', strftime('%s', 'now'))))
+    ,   `state` = json(:state)
+    WHERE
+        `id` IN (
+            SELECT
+                `id`
+            FROM
+                `job` AS `j`
+            WHERE
+                {query}
+            LIMIT 1
+        )
+    RETURNING
+        `id`
+    ,   `state`
+    ;""", {"state": json.dumps(blob["state"])})
+    results = cur.fetchall()
+    cur.close()
+    if results:
+        (id, state), = results
+        return jsonify({"id": id, "state": json.loads(state)}), 200
+    else:
+        abort(404)
+
+
+@app.route("/api/v0/job/<job_id>", methods=["GET"])
+def get_job(job_id):
+    """Return a job by ID."""
+
+    r = current_app.queries.job_get(request.db, id=job_id)
+    if not r:
+        abort(404)
+
+    # Unpack the response tuple
+    id, payload, events, state = r
+    return jsonify({
+        "id": id,
+        "payload": json.loads(payload),
+        "events": json.loads(events),
+        "state": json.loads(state) if state is not None else state,
+    }), 200
+
+
+@app.route("/api/v0/job/<job_id>/state", methods=["POST"])
+def update_state(job_id):
+    """CAS update a job's state, returning the updated job or indicating a conflict."""
+
+    document = request.get_json(force=True)
+    old = document["old"]
+    new = document["new"]
+    if current_app.queries.job_cas_state(
+        request.db,
+        id=job_id,
+        old_state=json.dumps(old),
+        new_state=json.dumps(new),
+    ):
+        return get_job(job_id)
+    else:
+        abort(409)
+
+
+@app.route("/api/v0/job/<job_id>/event", methods=["POST"])
+def append_event(job_id):
+    """Append a user-defined event to the job's log."""
+
+    current_app.queries.job_append_event(
+        request.db,
+        id=job_id,
+        event=json.dumps(request.get_json(force=True))
+    )
+
+    return get_job(job_id)
+
+
+@app.route("/api/v0/job/<job_id>", methods=["DELETE"])
+def delete_job(job_id):
+    """Delete a given job."""
+
+    current_app.queries.job_delete(request.db, id=job_id)
+
+    return jsonify({}), 200
+
+def main():
+    """Run the mock server."""
+
+    opts, args = parser.parse_known_args()
+
+    app.config["db"] = os.path.expanduser(os.path.expandvars(opts.db))
+    app.config["host"] = opts.host
+    app.config["port"] = opts.port
+
+    app.run(
+        threaded=True,
+    )
+
+
+if __name__ == "__main__":
+    main()
diff --git a/projects/jobq/src/python/jobq/rest/api.py b/projects/jobq/src/python/jobq/rest/api.py
new file mode 100644
index 0000000..b6dc7e5
--- /dev/null
+++ b/projects/jobq/src/python/jobq/rest/api.py
@@ -0,0 +1,88 @@
+"""A quick and dirty Python driver for the jobq API."""
+
+import typing as t
+
+import requests
+
+
+class DehydratedJob(t.NamedTuple):
+    """The 'important' bits of a given job."""
+
+    id: int
+    url: str
+    state: object
+
+
+class HydratedJob(t.NamedTuple):
+    """The full state of a given job."""
+
+    id: int
+    url: str
+    state: object
+    payload: object
+    events: object
+
+
+Job = t.Union[DehydratedJob, HydratedJob]
+
+
+class JobqClient(object):
+    def __init__(self, url, session=None):
+        self._url = url
+        self._session = session or requests.Session()
+
+    def _job(self, json):
+        return DehydratedJob(id=json["id"],
+                             url=f"{self._url}/api/v0/job/{json.get('id')}",
+                             state=json["state"])
+
+    def jobs(self, query=None, limit=10) -> t.Iterable[DehydratedJob]:
+        """Enumerate jobs on the queue."""
+
+        for job_frag in self._session.post(self._url + "/api/v0/job",
+                                           json={"query": query or [],
+                                                 "limit": limit})\
+                                     .json()\
+                                     .get("jobs"):
+            yield self._job(job_frag)
+
+    def poll(self, query, state) -> DehydratedJob:
+        """Poll the job queue for the first job matching the given query, atomically advancing it to the given state and returning the advanced Job."""
+
+        return self._job(self._session.post(self._url + "/api/v0/job/poll",
+                                            json={"query": query,
+                                                  "state": state}).json())
+
+    def create(self, payload: object) -> DehydratedJob:
+        """Create a new job in the system."""
+
+        job_frag = self._session.post(self._url + "/api/v0/job/create",
+                                      json=payload)\
+                                .json()
+        return self._job(job_frag)
+
+    def fetch(self, job: Job) -> HydratedJob:
+        """Fetch the current state of a job."""
+
+        return HydratedJob(url=job.url, **self._session.get(job.url).json())
+
+    def advance(self, job: Job, state: object) -> Job:
+        """Attempt to advance a job to a subsequent state."""
+
+        return HydratedJob(url=job.url,
+                           **self._session.post(job.url + "/state",
+                                                json={"old": job.state,
+                                                      "new": state}).json())
+
+    def event(self, job: Job, event: object) -> HydratedJob:
+        """Attempt to record an event against a job."""
+
+        return HydratedJob(url=job.url,
+                           **self._session.post(job.url + "/event",
+                                                json=event).json())
+
+    def delete(self, job: Job) -> None:
+        """Delete a remote job."""
+
+        return self._session.delete(job.url)\
+                            .raise_for_status()