[NO TESTS] WIP

This commit is contained in:
Reid 'arrdem' McKenzie 2023-03-20 18:50:05 -06:00
parent 467b238be1
commit d7ce3973ac
7 changed files with 604 additions and 0 deletions

View file

@ -0,0 +1,251 @@
# Notes
https://github.com/Pyrlang/Pyrlang
https://en.wikipedia.org/wiki/Single_system_image
## Example - Await
A common pattern working in distributed environments is to want to request another system perform a job and wait for its results.
There are lots of parallels here to making a function or RPC call, except that it's a distributed system with complex failure modes.
In a perfect world we'd want to just write something like this -
```python
#!/usr/bin/env python3.10
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
job = client.create_job(...)
result = await job
# Do something with the result
```
There's some room for variance here around API design taste, but this snippet is probably familiar to many Python readers.
Let's think about its failure modes.
First, that `await` is doing a lot of heavy lifting.
Presumably it's wrapping up a polling loop of some sort.
That may be acceptable in some circumstances, but it really leaves to the client library implementer the question of what an acceptable retry policy is.
Second, this snippet assumes that `create_job` will succeed.
There won't be an authorization error, or a network transit error, or a remote server error or anything like that.
Third, there's no other record of whatever `job` is.
If the Python interpreter running this program dies, or the user gets bored and `C-c`'s it or the computer encounters a problem, the job will be lost.
Maybe that's OK, maybe it isn't.
But it's a risk.
Now, let's think about taking on some of the complexity needed to solve these problems ourselves.
### Retrying challenges
We can manually write the retry loop polling a remote API.
``` python
#!/usr/bin/env python3.10
from datetime import datetime, timedelta
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
AWAIT_TIMEOUT = timedelta(minutes=30)
POLL_TIME = timedelta(seconds=10)
def sleep(duration=POLL_TIME):
"""A slightly more useful sleep. Has our default and does coercion."""
from time import sleep
if isinstance(duration, timedelta):
duration = duration.total_seconds()
sleep(duration)
# Create a job, assuming idempotence
while True:
try:
job = client.create_job(...)
start_time = datetime.now()
break
except:
sleep()
# Waiting for the job
while True:
# Time-based timeout
if datetime.now() - start_time > AWAIT_TIMEOUT:
raise TimeoutError
# Checking the job status, no backoff linear polling
try:
if not job.complete():
continue
except:
sleep()
continue
# Trying to read the job result, re-using the retry loop & total timeout machinery
try:
result = job.get()
break
except:
sleep()
continue
# Do something with the result
```
We could pull [retrying](https://pypi.org/project/retrying/) off the shelf and get some real mileage here.
`retrying` is a super handy little library that provides the `@retry` decorator, which implements a variety of common retrying concerns such as retrying N times with linear or exponential back-off, and such.
It's really just the `while/try/except` state machine we just wrote a couple times as a decorator.
``` python
#!/usr/bin/env python3.10
from datetime import datetime, timedelta
from retrying import retry
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
AWAIT_TIMEOUT = timedelta(minutes=30)
POLL_TIME = timedelta(seconds=10)
class StillWaitingException(Exception):
"""Something we can throw to signal we're still waiting on an external event."""
@retry(wait_fixed=POLL_TIME.total_milliseconds())
def r_create_job(client):
"""R[eliable] create job. Retries over exceptions forever with a delay. No jitter."""
return client.create_job()
@retry(stop_max_delay=AWAIT_TIMEOUT.total_milliseconds(),
wait_fixed=POLL_TIME.total_milliseconds())
def r_get_job(job):
"""R[eliable] get job. Retries over exceptions up to a total time with a delay. No jitter."""
if not job.complete():
raise StillWaitingException
return job.get()
job = r_create_job(client)
result = r_get_job(job)
# Do something with the result
```
That's pretty good!
We've preserved most of our direct control over the mechanical retrying behavior, we can tweak it or choose a different provider.
And we've managed to get the syntactic density of the original `await` example back ... almost.
This is where Python's lack of an anonymous function block syntax and other lexical structures becomes a sharp limiter.
In another language like Javascript or LUA, you could probably get this down to something like -
``` lua
-- retry is a function of retrying options to a function of a callable to retry
-- which returns a zero-argument callable which will execute the callable with
-- the retrying behavior as specified.
client = Client("http://service.local", api_key="...")
retry_config = {} -- Fake, obviously
with_retry = retry(retry_config)
job = with_retry(
funtion ()
return client.start_plan(...)
end)()
result = with_retry(
function()
if job.complete() then
return job.get()
end
end)()
```
The insight here is that the "callback" function we're defining in the Python example as `r_get_job` and so forth has no intrinsic need to be named.
In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader.
Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes.
Using [PEP-0342](https://www.python.org/dev/peps/pep-0342/#new-generator-method-send-value), it's possible to implement arbitrary coroutines in Python by `.send()`ing values to generators which may treat `yield` statements as rvalues for receiving remotely sent inputs.
This makes it possible to explicitly yield control to a remote interpreter, which will return or resume the couroutine with a result value.
Microsoft's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=python) use exactly this behavior to implement durable functions.
The "functions" provided by the API return sentinels which can be yielded to an external interpreter, which triggers processing and returns control when there are results.
This is [interpreter effect conversion pattern (Extensible Effects)](http://okmij.org/ftp/Haskell/extensible/exteff.pdf) as seen in Haskell and other tools; applied.
``` python
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
```
Now it would seem that you could "just" automate doing rewriting that to something like this -
``` python
@df.Durable
def main(ctx):
x = context.call_activity("F1", None)
y = context.call_activity("F2", x)
z = context.call_activity("F3", y)
return context.call_activity("F4", z)
```
There's some prior art for doing this (https://eigenfoo.xyz/manipulating-python-asts/, https://greentreesnakes.readthedocs.io/en/latest/manipulating.html#modifying-the-tree) but it's a lot of legwork for not much.
There are also some pretty gaping correctness holes in taking the decorator based rewriting approach;
how do you deal with rewriting imported code, or code that's in classes/behind `@property` and other such tricks?
Just not worth it.
Now, what we _can_ do is try to hijack the entire Python interpreter to implement the properties/tracing/history recording we want there.
The default cpython lacks hooks for doing this, but we can write a python-in-python interpreter and "lift" the user's program into an interpreter we control, which ultimately gets most of its behavior "for free" from the underlying cpython interpreter.
There's [an example](https://github.com/pfalcon/pyastinterp) of doing this as part of the pycopy project; although there it's more of a Scheme-style proof of metacircular self-hosting.
There's a modified copy of the astinterp in `scratch/` which is capable of running a considerable subset of py2/3.9 to the point of being able to source-import many libraries including `requests` and run PyPi sourced library code along with user code under hoisted interpretation.
It doesn't support coroutines/generators yet, and there's some machinery required to make it "safe" (meaningfully single-stepable; "fix"/support eval, enable user-defined import/`__import__` through the lifted python VM) but as a proof of concept of a lifted VM I'm genuinely shocked how well this works.
Next questions here revolve around how to "snapshot" the state of the interpreter meaningfully, and how to build a replayable interpreter log.
There are some specific challenges around how Python code interacts with native C code that could limit the viability of this approach, but at the absolute least this fully sandboxed Python interpreter could be used to implement whatever underlying magic could be desired and restricted to some language subset as desired.
The goal is to make something like this work -
``` python
from df import Activity
f1 = Activity("F1")
f2 = Activity("F2")
f3 = Activity("F3")
f4 = Activity("F4")
def main():
return f4(f3(f2(f1(None))))
```
Which may offer a possible solution to the interpreter checkpointing problem - only checkpoint "supported" operations.
Here the `Activity().__call__` operation would have special support, as with `datetime.datetime.now()` and controlling `time.sleep()`, threading and possibly `random.Random` seeding which cannot trivially be made repeatable.
### Durability challenges
FIXME - manually implementing snapshotting and recovery is hard
### Leverage with language support
FIXME - What does a DSL that helps with all this look like?

View file

@ -0,0 +1,53 @@
# Architecture
Flowmetal is an interpreted language backed by a durable event store.
The execution history of a program persists to the durable store as execution precedes.
If an interpretation step fails to persist, it can't have external effects.
This is the fundamental insight behind Microsoft AMBROSIA.
The event store also provides Flowmetal's only interface for communicating with external systems.
Other systems can attach to Flowmetal's data store and send events to and receive them from Flowmetal.
For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector.
This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks.
A Flowmetal deplyoment looks like this -
```
+----------------------------+
+---------------------------+ |
+--------------------------+ |--+
| External HTTP service(s) |--+
+--------------------------+
^ ^
| |
v v
+-----------------------+ +------------------------+
| HTTP server connector | | HTTP request connector |
+-----------------------+ +------------------------+
^ ^
| |
v v
+--------------------+ +----------------------+
| Shared event store | | Shared program store |
+--------------------+ +----------------------+
^ ^
| |
v v
+--------------------------+
| Flowmetal interpreter(s) |
+--------------------------+
```
Users interact with Flowmetal by creating (or editing) **Programs**.
An instance of a Program is called a **Task**.
Every Task has a unique **Inbox** and **Outbox**.
Comparable systems call the unit of execution a Process; we prefer Task because Process invites conflation with a Unix process or thread and our Tasks are entirely portable.
Tasks interact with the outside world by producing **Requests** into their Outbox.
For example an instance of a Task could request that some other Task be executed.
Delivering messages to some other Task, or making API calls against external services would be other good examples of Requests.
Tasks receive the results of their Requests and other external events as **Messages** in their **Inbox**.
A request that some other Task be executed would be responded to with a Message identifying the other task.
The requesting Task could choose to wait on the requested Task, or could disregard it.
Likewise the results of external requests return as Messages.

View file

@ -0,0 +1,154 @@
# What problem are you trying to solve?
In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap.
Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.) and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation.
This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand.
In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern.
Traditional automation tools such as shells struggle with this task because they make it difficult to implement features such as error handling.
Furthermore, in a distributed environment it cannot be assumed that a single machine can remain available to execute automation.
Automation authors are thus confronted not just with the need to use "real" programming languages but the need to implement a database backed state machine which can "checkpoint" the job.
Taking a step back, this is an enormous failure of the languages we have available to describe workflow tasks.
That users need to write state machines that define state machines that actually perform the desired task shows that the available tools operate at the wrong level.
Airflow for instance succeeds at providing a "workflow" control flow graph abstraction which frees users of the concerns of implementing their own resumable state machines.
Consider this example from the Airflow documentation -
```python
from __future__ import annotations
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
with DAG(
"example_branch_labels",
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = EmptyOperator(task_id="ingest")
analyse = EmptyOperator(task_id="analyze")
check = EmptyOperator(task_id="check_integrity")
describe = EmptyOperator(task_id="describe_integrity")
error = EmptyOperator(task_id="email_error")
save = EmptyOperator(task_id="save")
report = EmptyOperator(task_id="report")
ingest >> analyse >> check
check >> Label("No errors") >> save >> report
check >> Label("Errors found") >> describe >> error >> report
```
Compared to handwriting out all the nodes in the control flow graph and the requisite checkpointing to the database on state transitions, this is a considerable improvement.
But if you stop and look at this control flow graph, it's expressing a simple branching operation for which we have much more conventional notation.
Consider the same workflow using "normal" Python syntax rather than the embedded workflow syntax -
```python
def noop():
pass
def check(result):
return True
ingest = noop
analyse = noop
describe = noop
error = noop
save = noop
report = noop
def main():
ingest()
result = analyse()
if check(result):
save(result)
report(result)
else:
describe()
error()
report()
```
The program a developer authors is already a control flow graph.
We have a name for "operators" or state nodes - they're just functions.
We have a name and notation for transitions in the state chart - they're just sequential statements.
Anything we need developers to write above that baseline represents friction specific to a workflow task which we should seek to minimize.
Temporal does better than Airflow and reflects this understanding.
Using insights from Azure Durable Functions, their SDK leverages the details of Python's `async` and `await` operators to hijack program control flow and implement workflow features under the covers.
Consider this example from the Temporal documentation -
```python
@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
try:
while True:
print("Heartbeating cancel activity")
await asyncio.sleep(0.5)
activity.heartbeat("some details")
except asyncio.CancelledError:
print("Activity cancelled")
raise
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: ComposeArgsInput) -> None:
activity_handle = workflow.start_activity(
cancel_activity,
ComposeArgsInput(input.arg1, input.arg2),
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
)
await asyncio.sleep(3)
activity_handle.cancel()
```
This is really good compared to an equivalent Airflow graph!
All the details are "normal" Python, and the SDK fits "natively" into how Python execution occurs.
But it's still laden with syntax such as the `async` function coloring and decorators which serve only to support the workflow SDK.
In comparison were this workflow a "simple" Python script it would only need to be written
```python
# https://pypi.org/project/timeoutcontext/
from timeoutcontext import task_with_timeout, TimeoutError
def cancellable_activity():
try:
while True:
print("Heartbeating cancellable activity")
sleep(0.5)
except TimeoutError:
print("Activity cancelled")
raise
def main():
task = task_with_timeout(lambda: cancellable_activity(),
timeout=timedelta(minutes=5))
sleep(3)
task.cancel()
```
As with Airflow, the Temporal SDK effectively requires that the programmer learn not just a set of libraries but the Python `async` features because the implementation of the workflow engine is leaked to the users.
The problem isn't just the excessive syntax, it's that as with Airflow user workflows are no longer "normal" programs.
There is in effect an entire Temporal interpreter stacked inbetween the Python runtime with which users are familiar and the user's program.
It is in effect a new language with none of the notational advantages of being one.
The flipside is that there is an enormous advantage in tooling to be had by leveraging an existing language - or something that looks enough like one - rather than inventing a new notation.
This is the cardinal sin of workflow tools like kubeflow and various CI "workflow" formats - they adopt unique YAML or XML based notations which have no tooling support.
For instance by being "normal" (ish) Python, the Temporal SDK benefits from access to editor autocompletion, the MyPy typechecker and all manner of other tools.

View file

@ -0,0 +1,47 @@
# An Asynchronous, Distributed Task Engine
This document presents a design without reference implementation for a distributed programming system;
sometimes called a workflow engine.
It is intended to provide architectural level clarity allowing for the development of alternative designs or implementations as may suit.
## Problem Statement
In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap.
Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.)
and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation.
This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand.
In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern.
Traditional automation tools such as shells are inadequate for this environment because achieving network communication is excessively difficult.
In a distributed environment it cannot be assumed that a single machine can remain available to execute automation;
This requires an approach to automation which allows for the incremental execution of single tasks at a time with provisions for relocation and recovery should failure occur.
It also cannot be assumed that a single machine is sufficiently available to receive and process incoming events such as callbacks.
A distributed system is needed to wrangle distributed systems.
## Design Considerations
- Timeouts are everywhere
- Sub-Turing/boundable
-
## Architectural Overview
### Events
Things that will happen, or time out.
### Actions
Things the workflow will do, or time out.
### Bindings
Data the workflow either was given or computed.
### Conditionals
Decisions the workflow may make.
### Functions
A convenient way to talk about fragments of control flow graph.
### Tracing & Reporting

View file

@ -0,0 +1,29 @@
# -*- mode: python -*-
from flowmetal import workflow
def ingest():
return {}
def analyze(data):
return data.keys()
def check(keys) -> bool:
return len(keys) > 0
def report(keys):
print(keys)
@workflow
def main():
data = ingest()
data = analyze(data)
if check(data):
report(data)
else:
raise ValueError(report(data))

View file

@ -0,0 +1,26 @@
# -*- mode: python -*-
from datetime import timedelta
from time import sleep
from flowmetal import workflow, timeout, CancelledError, TimeoutError, Task
def cancellable_activity():
try:
while True:
print("Still alive")
sleep(0.5)
except CancelledError:
print("Task killed")
@workflow
def main():
# Somewhat like a thread
t = Task(target=cancellable_activity, args=(), timeout=timedelta(minutes=5))
t.start()
try:
result = t.result(timeout=timedelta(seconds=3))
print(result)
except TimeoutError:
t.cancel()

View file

@ -0,0 +1,44 @@
aiohttp
aiohttp_basicauth
async_lru
autoflake
beautifulsoup4
black
cachetools
click
colored
ExifRead
flake8
flask
hypothesis
icmplib
isort
jinja2
lark
livereload
lxml
markdown
meraki
octorest
octorest
openapi-spec-validator
prompt-toolkit
proquint
psycopg2
pycryptodome
pyrsistent
pytest-cov
pytest-postgresql
pyyaml
recommonmark
redis
requests
requests
retry
smbus2
sphinx
sphinxcontrib-openapi
sphinxcontrib-programoutput
unify
yamllint
yaspin