This commit is contained in:
Reid 'arrdem' McKenzie 2021-04-30 08:34:48 -06:00
parent 6d09df218c
commit 94fd9081ec

View file

@ -46,7 +46,7 @@ Other systems can attach to Flowmetal's datastore and send events to and receive
For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector.
This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks.
A Flowmetal setup would look something like this -
A Flowmetal setup could look something like this -
```
+----------------------------+
@ -74,124 +74,184 @@ A Flowmetal setup would look something like this -
+--------------------------+
```
## An example
## Example - Await
A Flowmetal program could look something like this -
A common pattern working in distributed environments is to want to request another system perform a job and wait for its results.
There are lots of parallels here to making a function or RPC call, except that it's a distributed system with complex failure modes.
In a perfect world we'd want to just write something like this -
```python
#!/usr/bin/env flowmetal
#!/usr/bin/env python3.10
from flow.http import callback
from flow.http import request
from flow.time import forever, sleep
from service.client import Client
# A common pattern is to make a HTTP request to some service which will do some
# processing and attempt to deliver a callback.
def simple_remote_job(make_request):
# Make a HTTP callback.
# HTTP callbacks have a URL to which a result may be delivered at most once,
# and define an "event" which can be waited for.
cb = callback.new()
# Use user-defined logic to construct a job request.
# When the job completes, it should make a request to the callback URL.
request = make_request(cb.url)
# We can now start the job
resp = request.execute(request)
# And now we can await a response which will populate the callback's event.
return await cb.event
# But there are a couple things which can go wrong here. The initial request
# could fail, the remote process could fail and the callback delivery could fail
# to name a few. We can provide general support for handling this by using the
# same control inversion pattern we used for building the request.
def reliable_remote_job(make_request,
job_from_response,
get_job,
callback_timeout=None,
job_completed=None,
poll_sleep=None,
poll_timeout=None):
# The structure here is much the same, except we need to handle some extra cases.
# First, we're gonna do the same callback dance but potentially with a timeout.
cb = callback.new()
request = make_request(cb.url)
resp = request.execute(request)
resp.raise_for_status()
job = job_from_response(resp)
# If the user gave us a circuit breaker, use that to bound our waiting.
with callback_timeout or forever():
try:
await cb.event
return get_job(job)
except Timeout:
pass
# The user hasn't given us enough info to do busy waiting, so we timed out.
if not (job_from_response and job_completed and get_job):
raise Timeout
# If we failed to wait for the callback, let's try polling the job.
# We'll let the user put a total bound on this too.
with poll_timeout or forever():
# We use user-defined logic to wait for the job to complete.
# Note that we don't conflate get_job and job_completed or assume they
# compose so that users can consume status endpoints without fetches.
while not job_completed(job):
# The user can define how we back off too.
# A stateful function could count successive calls and change behavior.
# For instance implementing constant, fibonacci or exponential backoff.
sleep(poll_sleep() if poll_sleep else 1)
# When the job is "complete", we let the user control fetching its status
return get_job(job)
# Let's do a quick example of consuming something like this.
# Say we have a system - let's call it wilson - that lets us request jobs
# for doing bare metal management. Drains, reboots, undrains and the like.
def create_job_request(host, stages, callbacks):
"""Forge but don't execute a job creation request."""
return request.new("POST", f"http://wilson.local/api/v3/host/{host}",
json={"stages": stages, "callbacks": callbacks or []})
def job_from_response(create_resp):
"""Handle the job creation response, returning the ID of the created job."""
return create_resp.json().get("job_id")
def get_job(job_id):
"""Fetch a job."""
return request.new("GET" f"http://wilson.local/api/v3/job/{job_id}").json()
def job_completed(job_id):
"""Decide if a job has competed."""
return (
request.new("GET" f"http://wilson.local/api/v3/job/{job_id}/status")
.json()
.get("status", "PENDING")
) in ["SUCCESS", "FAILURE"]
# These tools in hand, we can quickly express a variety of reliable jobs.
def reboot(host):
"""Reboot a host, attempting callback waiting but falling back to retry."""
return reliable_remote_job(
lambda url: create_job_request(host, ["drain", "reboot", "undrain"], [url]),
job_from_response,
get_job,
job_completed=job_completed,
)
CLIENT = Client("http://service.local", api_key="...")
job = client.create_job(...)
result = await job
# Do something with the result
```
The magic here is twofold.
First the user or program author is in complete control of the retrying behavior.
This code could be factored differently to treat it explicitly as a Future, constructed implicitly by `create_job_request` and consumed by user code.
Second we're able to take the incredibly operationally complex reliability and HTTP callback interaction almost as an afterthought.
There's some room for variance here around API design taste, but this snippet is probably familiar to many Python readers.
Let's think about its failure modes.
First, that `await` is doing a lot of heavy lifting.
Presumably it's wrapping up a polling loop of some sort.
That may be acceptable in some circumstances, but it really leaves to the client library implementer the question of what an acceptable retry policy is.
Second, this snippet assumes that `create_job` will succeed.
There won't be an authorization error, or a network transit error, or a remote server error or anything like that.
Third, there's no other record of whatever `job` is.
If the Python interpreter running this program dies, or the user gets bored and `C-c`'s it or the computer encounters a problem, the job will be lost.
Maybe that's OK, maybe it isn't.
But it's a risk.
Now, let's think about taking on some of the complexity needed to solve these problems ourselves.
### Retrying challenges
We can manually write the retry loop polling a remote API.
``` python
#!/usr/bin/env python3.10
from datetime import datetime, timedelta
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
AWAIT_TIMEOUT = timedelta(minutes=30)
POLL_TIME = timedelta(seconds=10)
def sleep(duration=POLL_TIME):
"""A slightly more useful sleep. Has our default and does coercion."""
from time import sleep
if isinstance(duration, timedelta):
duration = duration.total_seconds()
sleep(duration)
# Create a job, assuming idempotence
while True:
try:
job = client.create_job(...)
start_time = datetime.now()
break
except:
sleep()
# Waiting for the job
while True:
# Time-based timeout
if datetime.now() - start_time > AWAIT_TIMEOUT:
raise TimeoutError
# Checking the job status, no backoff linear polling
try:
if not job.complete():
continue
except:
sleep()
continue
# Trying to read the job result, re-using the retry loop & total timeout machinery
try:
result = job.get()
break
except:
sleep()
continue
# Do something with the result
```
We could pull [retrying](https://pypi.org/project/retrying/) off the shelf and get some real mileage here.
`retrying` is a super handy little library that provides the `@retry` decorator, which implements a variety of common retrying concerns such as retrying N times with linear or exponential back-off, and such.
It's really just the `while/try/except` state machine we just wrote a couple times as a decorator.
``` python
#!/usr/bin/env python3.10
from datetime import datetime, timedelta
from retrying import retry
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
AWAIT_TIMEOUT = timedelta(minutes=30)
POLL_TIME = timedelta(seconds=10)
class StillWaitingException(Exception):
"""Something we can throw to signal we're still waiting on an external event."""
@retry(wait_fixed=POLL_TIME.total_milliseconds())
def r_create_job(client):
"""R[eliable] create job. Retries over exceptions forever with a delay. No jitter."""
return client.create_job()
@retry(stop_max_delay=AWAIT_TIMEOUT.total_milliseconds(),
wait_fixed=POLL_TIME.total_milliseconds())
def r_get_job(job):
"""R[eliable] get job. Retries over exceptions up to a total time with a delay. No jitter."""
if not job.complete():
raise StillWaitingException
return job.get()
job = r_create_job(client)
result = r_get_job(job)
# Do something with the result
```
That's pretty good!
We've preserved most of our direct control over the mechanical retrying behavior, we can tweak it or choose a different provider.
And we've managed to get the syntactic density of the original `await` example back ... almost.
This is where Python's lack of an anonymous function block syntax and other lexical structures becomes a sharp limiter.
In another language like Javascript or LUA, you could probably get this down to something like -
``` lua
-- retry is a function of retrying options to a function of a callable to retry
-- which returns a zero-argument callable which will execute the callable with
-- the retrying behavior as specified.
client = Client("http://service.local", api_key="...")
job = retry()(
funtion ()
return client.start_plan(...)
end)()
result = retry()(
function()
if job.complete() then
return job.get()
end
end)()
```
The insight here is that the "callback" function we're defining in the Python example as `r_get_job` and soforth has no intrinsic need to be named.
In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader.
Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes.
### Durability challenges
FIXME - manually implementing snapshotting and recovery is hard
### Leverage with language support
FIXME - What does a DSL that helps with all this look like?
## License