From 94fd9081ec30b9194c8f07a17d6b487c1b7a1ae7 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Fri, 30 Apr 2021 08:34:48 -0600 Subject: [PATCH] Tapping --- projects/flowmetal/README.md | 284 +++++++++++++++++++++-------------- 1 file changed, 172 insertions(+), 112 deletions(-) diff --git a/projects/flowmetal/README.md b/projects/flowmetal/README.md index 45f4b03..e32d392 100644 --- a/projects/flowmetal/README.md +++ b/projects/flowmetal/README.md @@ -46,7 +46,7 @@ Other systems can attach to Flowmetal's datastore and send events to and receive For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector. This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks. -A Flowmetal setup would look something like this - +A Flowmetal setup could look something like this - ``` +----------------------------+ @@ -74,124 +74,184 @@ A Flowmetal setup would look something like this - +--------------------------+ ``` -## An example +## Example - Await -A Flowmetal program could look something like this - +A common pattern working in distributed environments is to want to request another system perform a job and wait for its results. +There are lots of parallels here to making a function or RPC call, except that it's a distributed system with complex failure modes. -``` python -#!/usr/bin/env flowmetal +In a perfect world we'd want to just write something like this - -from flow.http import callback -from flow.http import request -from flow.time import forever, sleep +```python +#!/usr/bin/env python3.10 +from service.client import Client -# A common pattern is to make a HTTP request to some service which will do some -# processing and attempt to deliver a callback. -def simple_remote_job(make_request): - # Make a HTTP callback. - # HTTP callbacks have a URL to which a result may be delivered at most once, - # and define an "event" which can be waited for. - cb = callback.new() - # Use user-defined logic to construct a job request. - # When the job completes, it should make a request to the callback URL. - request = make_request(cb.url) - # We can now start the job - resp = request.execute(request) - # And now we can await a response which will populate the callback's event. - return await cb.event - - -# But there are a couple things which can go wrong here. The initial request -# could fail, the remote process could fail and the callback delivery could fail -# to name a few. We can provide general support for handling this by using the -# same control inversion pattern we used for building the request. -def reliable_remote_job(make_request, - job_from_response, - get_job, - callback_timeout=None, - job_completed=None, - poll_sleep=None, - poll_timeout=None): - # The structure here is much the same, except we need to handle some extra cases. - # First, we're gonna do the same callback dance but potentially with a timeout. - cb = callback.new() - request = make_request(cb.url) - resp = request.execute(request) - resp.raise_for_status() - job = job_from_response(resp) - - # If the user gave us a circuit breaker, use that to bound our waiting. - with callback_timeout or forever(): - try: - await cb.event - return get_job(job) - except Timeout: - pass - - # The user hasn't given us enough info to do busy waiting, so we timed out. - if not (job_from_response and job_completed and get_job): - raise Timeout - - # If we failed to wait for the callback, let's try polling the job. - # We'll let the user put a total bound on this too. - with poll_timeout or forever(): - # We use user-defined logic to wait for the job to complete. - # Note that we don't conflate get_job and job_completed or assume they - # compose so that users can consume status endpoints without fetches. - while not job_completed(job): - # The user can define how we back off too. - # A stateful function could count successive calls and change behavior. - # For instance implementing constant, fibonacci or exponential backoff. - sleep(poll_sleep() if poll_sleep else 1) - - # When the job is "complete", we let the user control fetching its status - return get_job(job) - - -# Let's do a quick example of consuming something like this. -# Say we have a system - let's call it wilson - that lets us request jobs -# for doing bare metal management. Drains, reboots, undrains and the like. -def create_job_request(host, stages, callbacks): - """Forge but don't execute a job creation request.""" - return request.new("POST", f"http://wilson.local/api/v3/host/{host}", - json={"stages": stages, "callbacks": callbacks or []}) - - -def job_from_response(create_resp): - """Handle the job creation response, returning the ID of the created job.""" - return create_resp.json().get("job_id") - - -def get_job(job_id): - """Fetch a job.""" - return request.new("GET" f"http://wilson.local/api/v3/job/{job_id}").json() - - -def job_completed(job_id): - """Decide if a job has competed.""" - return ( - request.new("GET" f"http://wilson.local/api/v3/job/{job_id}/status") - .json() - .get("status", "PENDING") - ) in ["SUCCESS", "FAILURE"] - - -# These tools in hand, we can quickly express a variety of reliable jobs. -def reboot(host): - """Reboot a host, attempting callback waiting but falling back to retry.""" - return reliable_remote_job( - lambda url: create_job_request(host, ["drain", "reboot", "undrain"], [url]), - job_from_response, - get_job, - job_completed=job_completed, - ) +CLIENT = Client("http://service.local", api_key="...") +job = client.create_job(...) +result = await job +# Do something with the result ``` -The magic here is twofold. -First the user or program author is in complete control of the retrying behavior. -This code could be factored differently to treat it explicitly as a Future, constructed implicitly by `create_job_request` and consumed by user code. -Second we're able to take the incredibly operationally complex reliability and HTTP callback interaction almost as an afterthought. +There's some room for variance here around API design taste, but this snippet is probably familiar to many Python readers. +Let's think about its failure modes. + +First, that `await` is doing a lot of heavy lifting. +Presumably it's wrapping up a polling loop of some sort. +That may be acceptable in some circumstances, but it really leaves to the client library implementer the question of what an acceptable retry policy is. + +Second, this snippet assumes that `create_job` will succeed. +There won't be an authorization error, or a network transit error, or a remote server error or anything like that. + +Third, there's no other record of whatever `job` is. +If the Python interpreter running this program dies, or the user gets bored and `C-c`'s it or the computer encounters a problem, the job will be lost. +Maybe that's OK, maybe it isn't. +But it's a risk. + +Now, let's think about taking on some of the complexity needed to solve these problems ourselves. + +### Retrying challenges + +We can manually write the retry loop polling a remote API. + +``` python +#!/usr/bin/env python3.10 + +from datetime import datetime, timedelta + +from service.client import Client + + +CLIENT = Client("http://service.local", api_key="...") +AWAIT_TIMEOUT = timedelta(minutes=30) +POLL_TIME = timedelta(seconds=10) + + +def sleep(duration=POLL_TIME): + """A slightly more useful sleep. Has our default and does coercion.""" + from time import sleep + if isinstance(duration, timedelta): + duration = duration.total_seconds() + sleep(duration) + + +# Create a job, assuming idempotence +while True: + try: + job = client.create_job(...) + start_time = datetime.now() + break + except: + sleep() + +# Waiting for the job +while True: + # Time-based timeout + if datetime.now() - start_time > AWAIT_TIMEOUT: + raise TimeoutError + + # Checking the job status, no backoff linear polling + try: + if not job.complete(): + continue + except: + sleep() + continue + + # Trying to read the job result, re-using the retry loop & total timeout machinery + try: + result = job.get() + break + except: + sleep() + continue + +# Do something with the result +``` + +We could pull [retrying](https://pypi.org/project/retrying/) off the shelf and get some real mileage here. +`retrying` is a super handy little library that provides the `@retry` decorator, which implements a variety of common retrying concerns such as retrying N times with linear or exponential back-off, and such. +It's really just the `while/try/except` state machine we just wrote a couple times as a decorator. + +``` python +#!/usr/bin/env python3.10 + +from datetime import datetime, timedelta + +from retrying import retry + +from service.client import Client + + +CLIENT = Client("http://service.local", api_key="...") +AWAIT_TIMEOUT = timedelta(minutes=30) +POLL_TIME = timedelta(seconds=10) + + +class StillWaitingException(Exception): + """Something we can throw to signal we're still waiting on an external event.""" + + +@retry(wait_fixed=POLL_TIME.total_milliseconds()) +def r_create_job(client): + """R[eliable] create job. Retries over exceptions forever with a delay. No jitter.""" + return client.create_job() + + +@retry(stop_max_delay=AWAIT_TIMEOUT.total_milliseconds(), + wait_fixed=POLL_TIME.total_milliseconds()) +def r_get_job(job): + """R[eliable] get job. Retries over exceptions up to a total time with a delay. No jitter.""" + if not job.complete(): + raise StillWaitingException + + return job.get() + + +job = r_create_job(client) +result = r_get_job(job) +# Do something with the result +``` + +That's pretty good! +We've preserved most of our direct control over the mechanical retrying behavior, we can tweak it or choose a different provider. +And we've managed to get the syntactic density of the original `await` example back ... almost. + +This is where Python's lack of an anonymous function block syntax and other lexical structures becomes a sharp limiter. +In another language like Javascript or LUA, you could probably get this down to something like - + +``` lua +-- retry is a function of retrying options to a function of a callable to retry +-- which returns a zero-argument callable which will execute the callable with +-- the retrying behavior as specified. + +client = Client("http://service.local", api_key="...") + +job = retry()( + funtion () + return client.start_plan(...) + end)() + +result = retry()( + function() + if job.complete() then + return job.get() + end + end)() +``` + +The insight here is that the "callback" function we're defining in the Python example as `r_get_job` and soforth has no intrinsic need to be named. +In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader. +Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes. + + +### Durability challenges + +FIXME - manually implementing snapshotting and recovery is hard + + +### Leverage with language support + +FIXME - What does a DSL that helps with all this look like? ## License