Compare commits
10 commits
a49c0d2d88
...
d7ce3973ac
Author | SHA1 | Date | |
---|---|---|---|
d7ce3973ac | |||
467b238be1 | |||
6c9478e400 | |||
26f871ae60 | |||
8a1c03dcc3 | |||
ae6f7f4ad0 | |||
16215dec12 | |||
f964f64800 | |||
8e53c8e509 | |||
5708d44a6e |
21 changed files with 437 additions and 324 deletions
|
@ -92,6 +92,10 @@ class Application(web.Application):
|
|||
)
|
||||
asyncio.run(self.handle_run())
|
||||
|
||||
# Save off config before exit
|
||||
self.config.save()
|
||||
self.database.save()
|
||||
|
||||
def stop(self, *_):
|
||||
self["running"] = False
|
||||
|
||||
|
|
|
@ -197,17 +197,6 @@ class RelayConfig(DotDict):
|
|||
return True
|
||||
|
||||
def save(self):
|
||||
config = {
|
||||
"db": self["db"],
|
||||
"listen": self.listen,
|
||||
"port": self.port,
|
||||
"note": self.note,
|
||||
"push_limit": self.push_limit,
|
||||
"ap": {key: self[key] for key in self.apkeys},
|
||||
"cache": {key: self[key] for key in self.cachekeys},
|
||||
}
|
||||
|
||||
with open(self._path, "w") as fd:
|
||||
yaml.dump(config, fd, sort_keys=False)
|
||||
|
||||
return config
|
||||
fd.write("---\n")
|
||||
yaml.dump(self.jsonify(), fd, sort_keys=True)
|
||||
|
|
|
@ -45,54 +45,22 @@ class RelayDatabase(dict):
|
|||
self["private-key"] = self.PRIVKEY.exportKey("PEM").decode("utf-8")
|
||||
|
||||
def load(self):
|
||||
new_db = True
|
||||
|
||||
try:
|
||||
with self.config.db.open() as fd:
|
||||
data = json.load(fd)
|
||||
|
||||
self["version"] = data.get("version", None)
|
||||
self["version"] = data.get("version", 1)
|
||||
self["private-key"] = data.get("private-key")
|
||||
|
||||
if self["version"] == None:
|
||||
self["version"] = 1
|
||||
|
||||
if "actorKeys" in data:
|
||||
self["private-key"] = data["actorKeys"]["privateKey"]
|
||||
|
||||
for item in data.get("relay-list", []):
|
||||
domain = urlparse(item).hostname
|
||||
self["relay-list"][domain] = {"inbox": item, "followid": None}
|
||||
|
||||
else:
|
||||
self["relay-list"] = data.get("relay-list", {})
|
||||
|
||||
for domain in self["relay-list"].keys():
|
||||
if self.config.is_banned(domain) or (
|
||||
self.config.whitelist_enabled
|
||||
and not self.config.is_whitelisted(domain)
|
||||
):
|
||||
self.del_inbox(domain)
|
||||
|
||||
new_db = False
|
||||
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
if self.config.db.stat().st_size > 0:
|
||||
raise e from None
|
||||
self["follow-requests"] = data.get("follow-requests")
|
||||
|
||||
if not self.privkey:
|
||||
logging.info("No actor keys present, generating 4096-bit RSA keypair.")
|
||||
self.generate_key()
|
||||
self.save()
|
||||
|
||||
else:
|
||||
self.PRIVKEY = RSA.importKey(self.privkey)
|
||||
|
||||
self.save()
|
||||
return not new_db
|
||||
|
||||
def save(self):
|
||||
with self.config.db.open("w") as fd:
|
||||
json.dump(self, fd, indent=4)
|
||||
|
|
|
@ -328,6 +328,15 @@ class DotDict(dict):
|
|||
def to_json(self, indent=None):
|
||||
return json.dumps(self, indent=indent)
|
||||
|
||||
def jsonify(self):
|
||||
def _xform(v):
|
||||
if hasattr(v, 'jsonify'):
|
||||
return v.jsonify()
|
||||
else:
|
||||
return v
|
||||
|
||||
return {k: _xform(v) for k, v in self.items()}
|
||||
|
||||
def update(self, _data, **kwargs):
|
||||
if isinstance(_data, dict):
|
||||
for key, value in _data.items():
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import json
|
||||
from json.decoder import JSONDecodeError
|
||||
import logging
|
||||
|
||||
from aiohttp.web import HTTPUnauthorized, Request
|
||||
|
@ -25,7 +27,7 @@ def register_route(method, path):
|
|||
|
||||
@register_route("GET", "/")
|
||||
async def home(request):
|
||||
following = "<ul>" + ("\n".join(f"<li>{it}</li>" for it in request.app.database.hostnames)) + "</ul>"
|
||||
following = "<ul>" + ("\n".join(f"<li>{it}</li>" for it in request.app.database["relay-list"])) + "</ul>"
|
||||
following_count = len(request.app.database.hostnames)
|
||||
requested = "<ul>" + ("\n".join(f"<li>{it}</li>" for it in request.app.database["follow-requests"])) + "</ul>"
|
||||
requested_count = len(request.app.database["follow-requests"])
|
||||
|
@ -221,16 +223,22 @@ async def set_config(request: Request):
|
|||
return Response.new_error(403, "access denied", "json")
|
||||
|
||||
# FIXME: config doesn't have a way to go from JSON or update, using dict stuff
|
||||
new_config = await request.json()
|
||||
text = await request.text()
|
||||
try:
|
||||
new_config = json.loads(text)
|
||||
except JSONDecodeError as e:
|
||||
logging.exception(f"Unable to load config {text!r}")
|
||||
return Response.new_error(400, "bad request", "json")
|
||||
|
||||
request.app.config.update(new_config)
|
||||
|
||||
# If there are pending follows which are NOW whitelisted, allow them
|
||||
if request.app.config.whitelist_enabled:
|
||||
# If there are pending follows which are NOW whitelisted, allow them
|
||||
for domain in request.app.config.whitelist:
|
||||
if (pending_follow := request.app.database.get_request(domain, False)):
|
||||
logging.info(f"Acknowledging queued follow request from {domain}...")
|
||||
await misc.request(
|
||||
actor.shared_inbox,
|
||||
pending_follow["inbox"],
|
||||
misc.Message.new_response(
|
||||
host=request.app.config.host,
|
||||
actor=pending_follow["actor"],
|
||||
|
@ -240,7 +248,7 @@ async def set_config(request: Request):
|
|||
)
|
||||
|
||||
await misc.request(
|
||||
actor.shared_inbox,
|
||||
pending_follow["inbox"],
|
||||
misc.Message.new_follow(
|
||||
host=request.app.config.host,
|
||||
actor=pending_follow["actor"]
|
||||
|
@ -249,6 +257,8 @@ async def set_config(request: Request):
|
|||
|
||||
request.app.database.del_request(domain)
|
||||
|
||||
# FIXME: If there are EXISTING follows which are NO LONGER allowed/are blacklisted, drop them
|
||||
|
||||
request.app.database.save()
|
||||
|
||||
request.app.config.save()
|
||||
|
@ -267,6 +277,17 @@ def get_config(request: Request):
|
|||
return Response.new(request.app.config, status=200, ctype="json")
|
||||
|
||||
|
||||
@register_route("GET", "/admin/db")
|
||||
def get_db(request: Request):
|
||||
if not (auth := request.headers.get("Authorization")):
|
||||
return Response.new_error(403, "access denied", "json")
|
||||
|
||||
if not auth == f"Bearer {request.app.config.admin_token}":
|
||||
return Response.new_error(403, "access denied", "json")
|
||||
|
||||
return Response.new(request.app.database, status=200, ctype="json")
|
||||
|
||||
|
||||
@register_route("GET", "/admin/pending")
|
||||
def get_pending(request):
|
||||
if not (auth := request.headers.get("Authorization")):
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
# Flowmetal
|
||||
|
||||
> A shining mercurial metal laden with sensors and almost infinitely reconfigurable.
|
||||
>
|
||||
> A shining mercurial metal, laden with sensors and almost infinitely reconfigurable.
|
||||
> The stuff of which robots and servitors are made.
|
||||
|
||||
Flowmetal is a substrate for automation.
|
||||
It attempts to provide a programming environment wherein programs are durable, evented and asynchronous aimed at what would traditionally be described as scripting or coordination.
|
||||
It provides a programming environment wherein programs are durable, evented and asynchronous by default.
|
||||
It is aimed at scripting or coordination tasks, such as workflows and scheduled jobs.
|
||||
|
||||
Let's unpack these terms.
|
||||
|
||||
|
@ -21,62 +21,16 @@ This also allows for external systems such as REST callback APIs, databases and
|
|||
It also allows bidirectional communication between Flowmetal programs and other more traditional programming environments.
|
||||
Anything that can communicate with Flowmetal can provide function implementations, or call Flowmetal programs!
|
||||
|
||||
This centering of evented communication makes Flowmetal ideal for **coordination** tasks, from simple task sequencing to map/reduce and other parallelism patterns.
|
||||
|
||||
**Asynchronous** - thanks to Flowmetal's evented execution model, waiting for slow external events either synchronously or asynchronously is second nature!
|
||||
Flowmetal is especially good at waiting for very, very slow external operations.
|
||||
Stuff like webhooks and batch processes.
|
||||
|
||||
**Scripting** - the tradeoff Flowmetal makes for the evented model is that it's slow.
|
||||
While Flowmetal foreign functions could be fast, Flowmetal's interpreter isn't designed for speed.
|
||||
It's designed for eventing and ensuring durability.
|
||||
This makes Flowmetal suitable for interacting with and coordinating other systems, but it's not gonna win any benchmark games.
|
||||
**Scripting** - Durablity and distribution of execution come at coordination costs which make Flowmetal well suited for coordination tasks, but not for heavy processing.
|
||||
|
||||
## An overview
|
||||
|
||||
In the systems world we have SH, Borne SH, BASH, ZSH and friends which provide a common interface for connecting processes together.
|
||||
However in the distributed system world we don't have a good parallel for connecting microservices; especially where complex failure handling is required.
|
||||
|
||||
I previously [blogged a bit](https://www.arrdem.com/2019/04/01/the_silver_tower/) about some ideas for what this could look like.
|
||||
I'm convinced that a programming environment based around [virtual resiliency](https://www.microsoft.com/en-us/research/publication/a-m-b-r-o-s-i-a-providing-performant-virtual-resiliency-for-distributed-applications/) is a worthwhile goal (having independently invented it) and worth trying to bring to a mainstream general purpose platform like Python.
|
||||
|
||||
Flowmetal is an interpreted language backed by a durable event store.
|
||||
The execution history of a program persists to the durable store as execution precedes.
|
||||
If an interpretation step fails to persist, it can't have external effects.
|
||||
This is the fundamental insight behind Microsoft AMBROSIA.
|
||||
The event store also provides Flowmetal's only interface for communicating with external systems.
|
||||
Other systems can attach to Flowmetal's data store and send events to and receive them from Flowmetal.
|
||||
For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector.
|
||||
This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks.
|
||||
|
||||
A Flowmetal setup could look something like this -
|
||||
|
||||
```
|
||||
+----------------------------+
|
||||
+---------------------------+ |
|
||||
+--------------------------+ |--+
|
||||
| External HTTP service(s) |--+
|
||||
+--------------------------+
|
||||
^ ^
|
||||
| |
|
||||
v v
|
||||
+-----------------------+ +------------------------+
|
||||
| HTTP server connector | | HTTP request connector |
|
||||
+-----------------------+ +------------------------+
|
||||
^ ^
|
||||
| |
|
||||
v v
|
||||
+--------------------+
|
||||
| Shared event store |
|
||||
+--------------------+
|
||||
^
|
||||
|
|
||||
v
|
||||
+--------------------------+
|
||||
| Flowmetal interpreter(s) |
|
||||
+--------------------------+
|
||||
```
|
||||
- For a problem statement, see [Call/CC Airflow](docs/call_cc_airflow.md).
|
||||
- For an architecture overview, see [Architecture](docs/architecture.md).
|
||||
- For example doodles, see [examples](examples).
|
||||
|
||||
## License
|
||||
|
||||
Mirrored from https://git.arrdem.com/arrdem/flowmetal
|
||||
|
||||
Published under the MIT license. See [LICENSE.md](LICENSE.md)
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
#+TITLE: Flowmetal TODOs
|
||||
|
||||
* parser
|
||||
** TODO Rework the tokens in terms of spans instead of just start points :tokens:parser:
|
||||
Having start and end information allows for textual display of ranges and other
|
||||
potentially interesting error formatting. Requires some refactoring.
|
||||
|
||||
** TODO Implement load() in the parser :parser:
|
||||
At present the parser can parse well enough, but it returns a token tree
|
||||
intended for use in refactoring and autoformatting tools not a direct 'ast' list
|
||||
tree which is how load() is supposed to behave.
|
||||
|
||||
Figure out how to "mixin" implicit unwrapping of token boxes to values when
|
||||
loading insted of reading.
|
||||
|
||||
** DONE Implement parser support for :- type ascriptions :parser:
|
||||
Maybe this is a special case of keywords, maybe it isn't. Support ⊢ as an alternative. Maybe |- ?
|
||||
|
||||
** TODO Think about the difference between reading "data" and reading expression/code syntax :parser:
|
||||
EDN suggests these two things are largely the same ... but they clearly aren't.
|
||||
|
||||
** TODO Do I want to implement #_ reader discard support? :parser:
|
||||
Reader discard is a convenient alternative to commenting a bunch of stuff out,
|
||||
but does require a fair bit of complexity in the parser to support properly.
|
||||
|
||||
** TODO Do I want to implement #?() reader conditional support? :parser:
|
||||
Reader conditionals are cool for feature expressions and multiple platforms, but
|
||||
are of unclear value given that I only have one target for the forseeable and
|
||||
Flowmetal is already supposed to be a platform agnostic sort of thing.
|
||||
|
||||
** DONE Finish out float support
|
||||
** DONE Implement strings
|
||||
** TODO Think about what multiple grammars / dialects look like
|
||||
* TODO Look at Python SQL ORMs :server:storage:
|
||||
- Must support PostgresQL
|
||||
- Must support SQLite
|
||||
|
||||
The goal is to be able to run the "leader" servers off of postgres and have local
|
||||
state stores for wokers stored in sqlite using large amounts of the same schema.
|
||||
Being able to get marshalling and unmarshalling to JSON 'for free' would be
|
||||
lovely.
|
||||
|
||||
* TODO Look at Flask OpenAPI spec stuff :server:
|
||||
- Ideally want to go spec first
|
||||
- Must be able to provide validation
|
||||
- Would be nice to be able to use the spec to drive implementing the app (mounting functions to routes)
|
|
@ -1,39 +0,0 @@
|
|||
# -*- mode: org -*-
|
||||
|
||||
|
||||
Archived entries from file /home/arrdem/doc/hobby/programming/lang/python/flowmetal/TODO.org
|
||||
|
||||
|
||||
* DONE Implement parse() separately in the parser
|
||||
:PROPERTIES:
|
||||
:ARCHIVE_TIME: 2020-06-14 Sun 11:34
|
||||
:ARCHIVE_FILE: ~/doc/hobby/programming/lang/python/flowmetal/TODO.org
|
||||
:ARCHIVE_CATEGORY: TODO
|
||||
:ARCHIVE_TODO: DONE
|
||||
:END:
|
||||
Relates to implementing load()
|
||||
|
||||
When we have a workable load which generates data, we'll want a read() which
|
||||
generates a syntax tree so that we don't discard that API entirely.
|
||||
|
||||
|
||||
* DONE Parser test suite
|
||||
:PROPERTIES:
|
||||
:ARCHIVE_TIME: 2020-06-14 Sun 11:34
|
||||
:ARCHIVE_FILE: ~/doc/hobby/programming/lang/python/flowmetal/TODO.org
|
||||
:ARCHIVE_CATEGORY: TODO
|
||||
:ARCHIVE_TODO: DONE
|
||||
:END:
|
||||
- Cover the various scanners
|
||||
- Cover the position tracking machinery
|
||||
|
||||
|
||||
* DONE Get pytest set up
|
||||
:PROPERTIES:
|
||||
:ARCHIVE_TIME: 2020-06-14 Sun 11:34
|
||||
:ARCHIVE_FILE: ~/doc/hobby/programming/lang/python/flowmetal/TODO.org
|
||||
:ARCHIVE_CATEGORY: TODO
|
||||
:ARCHIVE_TODO: DONE
|
||||
:END:
|
||||
As it says on the tim
|
||||
|
53
projects/flowmetal/doc/architecture.md
Normal file
53
projects/flowmetal/doc/architecture.md
Normal file
|
@ -0,0 +1,53 @@
|
|||
# Architecture
|
||||
|
||||
Flowmetal is an interpreted language backed by a durable event store.
|
||||
The execution history of a program persists to the durable store as execution precedes.
|
||||
If an interpretation step fails to persist, it can't have external effects.
|
||||
This is the fundamental insight behind Microsoft AMBROSIA.
|
||||
The event store also provides Flowmetal's only interface for communicating with external systems.
|
||||
Other systems can attach to Flowmetal's data store and send events to and receive them from Flowmetal.
|
||||
For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector.
|
||||
This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks.
|
||||
|
||||
A Flowmetal deplyoment looks like this -
|
||||
|
||||
```
|
||||
+----------------------------+
|
||||
+---------------------------+ |
|
||||
+--------------------------+ |--+
|
||||
| External HTTP service(s) |--+
|
||||
+--------------------------+
|
||||
^ ^
|
||||
| |
|
||||
v v
|
||||
+-----------------------+ +------------------------+
|
||||
| HTTP server connector | | HTTP request connector |
|
||||
+-----------------------+ +------------------------+
|
||||
^ ^
|
||||
| |
|
||||
v v
|
||||
+--------------------+ +----------------------+
|
||||
| Shared event store | | Shared program store |
|
||||
+--------------------+ +----------------------+
|
||||
^ ^
|
||||
| |
|
||||
v v
|
||||
+--------------------------+
|
||||
| Flowmetal interpreter(s) |
|
||||
+--------------------------+
|
||||
```
|
||||
|
||||
Users interact with Flowmetal by creating (or editing) **Programs**.
|
||||
|
||||
An instance of a Program is called a **Task**.
|
||||
Every Task has a unique **Inbox** and **Outbox**.
|
||||
Comparable systems call the unit of execution a Process; we prefer Task because Process invites conflation with a Unix process or thread and our Tasks are entirely portable.
|
||||
|
||||
Tasks interact with the outside world by producing **Requests** into their Outbox.
|
||||
For example an instance of a Task could request that some other Task be executed.
|
||||
Delivering messages to some other Task, or making API calls against external services would be other good examples of Requests.
|
||||
|
||||
Tasks receive the results of their Requests and other external events as **Messages** in their **Inbox**.
|
||||
A request that some other Task be executed would be responded to with a Message identifying the other task.
|
||||
The requesting Task could choose to wait on the requested Task, or could disregard it.
|
||||
Likewise the results of external requests return as Messages.
|
154
projects/flowmetal/doc/call_cc_airflow.md
Normal file
154
projects/flowmetal/doc/call_cc_airflow.md
Normal file
|
@ -0,0 +1,154 @@
|
|||
# What problem are you trying to solve?
|
||||
|
||||
In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap.
|
||||
|
||||
Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.) and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation.
|
||||
This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand.
|
||||
|
||||
In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern.
|
||||
Traditional automation tools such as shells struggle with this task because they make it difficult to implement features such as error handling.
|
||||
|
||||
Furthermore, in a distributed environment it cannot be assumed that a single machine can remain available to execute automation.
|
||||
Automation authors are thus confronted not just with the need to use "real" programming languages but the need to implement a database backed state machine which can "checkpoint" the job.
|
||||
|
||||
Taking a step back, this is an enormous failure of the languages we have available to describe workflow tasks.
|
||||
That users need to write state machines that define state machines that actually perform the desired task shows that the available tools operate at the wrong level.
|
||||
|
||||
Airflow for instance succeeds at providing a "workflow" control flow graph abstraction which frees users of the concerns of implementing their own resumable state machines.
|
||||
|
||||
Consider this example from the Airflow documentation -
|
||||
|
||||
```python
|
||||
from __future__ import annotations
|
||||
|
||||
import pendulum
|
||||
|
||||
from airflow import DAG
|
||||
from airflow.operators.empty import EmptyOperator
|
||||
from airflow.utils.edgemodifier import Label
|
||||
|
||||
with DAG(
|
||||
"example_branch_labels",
|
||||
schedule="@daily",
|
||||
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
|
||||
catchup=False,
|
||||
) as dag:
|
||||
ingest = EmptyOperator(task_id="ingest")
|
||||
analyse = EmptyOperator(task_id="analyze")
|
||||
check = EmptyOperator(task_id="check_integrity")
|
||||
describe = EmptyOperator(task_id="describe_integrity")
|
||||
error = EmptyOperator(task_id="email_error")
|
||||
save = EmptyOperator(task_id="save")
|
||||
report = EmptyOperator(task_id="report")
|
||||
|
||||
ingest >> analyse >> check
|
||||
check >> Label("No errors") >> save >> report
|
||||
check >> Label("Errors found") >> describe >> error >> report
|
||||
```
|
||||
|
||||
Compared to handwriting out all the nodes in the control flow graph and the requisite checkpointing to the database on state transitions, this is a considerable improvement.
|
||||
But if you stop and look at this control flow graph, it's expressing a simple branching operation for which we have much more conventional notation.
|
||||
|
||||
Consider the same workflow using "normal" Python syntax rather than the embedded workflow syntax -
|
||||
|
||||
```python
|
||||
def noop():
|
||||
pass
|
||||
|
||||
def check(result):
|
||||
return True
|
||||
|
||||
ingest = noop
|
||||
analyse = noop
|
||||
describe = noop
|
||||
error = noop
|
||||
save = noop
|
||||
report = noop
|
||||
|
||||
def main():
|
||||
ingest()
|
||||
result = analyse()
|
||||
if check(result):
|
||||
save(result)
|
||||
report(result)
|
||||
else:
|
||||
describe()
|
||||
error()
|
||||
report()
|
||||
```
|
||||
|
||||
The program a developer authors is already a control flow graph.
|
||||
We have a name for "operators" or state nodes - they're just functions.
|
||||
We have a name and notation for transitions in the state chart - they're just sequential statements.
|
||||
|
||||
Anything we need developers to write above that baseline represents friction specific to a workflow task which we should seek to minimize.
|
||||
|
||||
Temporal does better than Airflow and reflects this understanding.
|
||||
Using insights from Azure Durable Functions, their SDK leverages the details of Python's `async` and `await` operators to hijack program control flow and implement workflow features under the covers.
|
||||
|
||||
Consider this example from the Temporal documentation -
|
||||
|
||||
```python
|
||||
@activity.defn
|
||||
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
|
||||
try:
|
||||
while True:
|
||||
print("Heartbeating cancel activity")
|
||||
await asyncio.sleep(0.5)
|
||||
activity.heartbeat("some details")
|
||||
except asyncio.CancelledError:
|
||||
print("Activity cancelled")
|
||||
raise
|
||||
|
||||
|
||||
@workflow.defn
|
||||
class GreetingWorkflow:
|
||||
@workflow.run
|
||||
async def run(self, input: ComposeArgsInput) -> None:
|
||||
activity_handle = workflow.start_activity(
|
||||
cancel_activity,
|
||||
ComposeArgsInput(input.arg1, input.arg2),
|
||||
start_to_close_timeout=timedelta(minutes=5),
|
||||
heartbeat_timeout=timedelta(seconds=30),
|
||||
)
|
||||
|
||||
await asyncio.sleep(3)
|
||||
activity_handle.cancel()
|
||||
```
|
||||
|
||||
This is really good compared to an equivalent Airflow graph!
|
||||
All the details are "normal" Python, and the SDK fits "natively" into how Python execution occurs.
|
||||
But it's still laden with syntax such as the `async` function coloring and decorators which serve only to support the workflow SDK.
|
||||
|
||||
In comparison were this workflow a "simple" Python script it would only need to be written
|
||||
|
||||
```python
|
||||
# https://pypi.org/project/timeoutcontext/
|
||||
from timeoutcontext import task_with_timeout, TimeoutError
|
||||
|
||||
|
||||
def cancellable_activity():
|
||||
try:
|
||||
while True:
|
||||
print("Heartbeating cancellable activity")
|
||||
sleep(0.5)
|
||||
except TimeoutError:
|
||||
print("Activity cancelled")
|
||||
raise
|
||||
|
||||
|
||||
def main():
|
||||
task = task_with_timeout(lambda: cancellable_activity(),
|
||||
timeout=timedelta(minutes=5))
|
||||
sleep(3)
|
||||
task.cancel()
|
||||
```
|
||||
|
||||
As with Airflow, the Temporal SDK effectively requires that the programmer learn not just a set of libraries but the Python `async` features because the implementation of the workflow engine is leaked to the users.
|
||||
The problem isn't just the excessive syntax, it's that as with Airflow user workflows are no longer "normal" programs.
|
||||
There is in effect an entire Temporal interpreter stacked inbetween the Python runtime with which users are familiar and the user's program.
|
||||
It is in effect a new language with none of the notational advantages of being one.
|
||||
|
||||
The flipside is that there is an enormous advantage in tooling to be had by leveraging an existing language - or something that looks enough like one - rather than inventing a new notation.
|
||||
This is the cardinal sin of workflow tools like kubeflow and various CI "workflow" formats - they adopt unique YAML or XML based notations which have no tooling support.
|
||||
For instance by being "normal" (ish) Python, the Temporal SDK benefits from access to editor autocompletion, the MyPy typechecker and all manner of other tools.
|
|
@ -1,30 +0,0 @@
|
|||
# A manifesto
|
||||
|
||||
In the last decade, immutability has been affirmed in the programming mainstream as an effective tool for making programs and state more manageable, and one which has been repeatedly implemented at acceptable performance costs.
|
||||
Especially in messaging based rather than state sharing environments, immutability and "data" oriented programming is becoming more and more common.
|
||||
|
||||
It also seems that much of the industry is moving towards message based reactive or network based connective systems.
|
||||
Microservices seem to have won, and functions-as-a-service seem to be a rising trend reflecting a desire to offload or avoid deployment management rather than wrangle stateful services.
|
||||
|
||||
In these environments, programs begin to consist entirely of messaging with other programs over shared channels such as traditional HTTP or other RPC tools or message buses such as Kafka, gRPC, ThriftMux and soforth.
|
||||
|
||||
Key challenges with these connective services are:
|
||||
- How they handle failure
|
||||
- How they achieve reliability
|
||||
- The ergonomic difficulties of building and deploying connective programs
|
||||
- The operational difficulties of managing N-many 'reliable' services
|
||||
|
||||
Tools like Argo, Airflow and the like begin to talk about such networked or evented programs as DAGs; providing schedulers for sequencing actions and executors for performing actions.
|
||||
|
||||
Airflow provides a programmable Python scheduler environment, but fails to provide an execution isolation boundary (such as a container or other subprocess/`fork()` boundary) allowing users to bring their own dependencies.
|
||||
Instead Airflow users must build custom Airflow packagings which bundle dependencies into the Airflow instance.
|
||||
This means that Airflow deployments can only be centralized with difficulty due to shared dependencies and disparate dependency lifecycles and limits the return on investment of the platform by increasing operational burden.
|
||||
|
||||
Argo ducks this mistake, providing a robust scheduler and leveraging k8s for its executor.
|
||||
This allows Argo to be managed independently of any of the workloads it manages - a huge step forwards over Airflow - but this comes at considerable ergonomic costs for trivial tasks and provides a more limited scheduler.
|
||||
|
||||
Previously I developed a system which provided a much stronger DSL than Airflow's, but made the same key mistake of not decoupling execution from the scheduler/coordinator.
|
||||
Calf is a sketch of a programming language and system with a nearly fully featured DSL, and decoupling between scheduling (control flow of programs) and execution of "terminal" actions.
|
||||
|
||||
In short, think a Py-Lisp where instead of doing FFI directly to the parent Python instance you do FFI by enqueuing a (potentially retryable!) request onto a shared cluster message bus, from which subscriber worker processes elsewhere provide request/response handling.
|
||||
One could reasonably accuse this project of being an attempt to unify Erlang and a hosted Python to build a "BASH for distsys" tool while providing a multi-tenant execution platform that can be centrally managed.
|
29
projects/flowmetal/examples/abc.flow
Normal file
29
projects/flowmetal/examples/abc.flow
Normal file
|
@ -0,0 +1,29 @@
|
|||
# -*- mode: python -*-
|
||||
|
||||
from flowmetal import workflow
|
||||
|
||||
|
||||
def ingest():
|
||||
return {}
|
||||
|
||||
|
||||
def analyze(data):
|
||||
return data.keys()
|
||||
|
||||
|
||||
def check(keys) -> bool:
|
||||
return len(keys) > 0
|
||||
|
||||
|
||||
def report(keys):
|
||||
print(keys)
|
||||
|
||||
|
||||
@workflow
|
||||
def main():
|
||||
data = ingest()
|
||||
data = analyze(data)
|
||||
if check(data):
|
||||
report(data)
|
||||
else:
|
||||
raise ValueError(report(data))
|
26
projects/flowmetal/examples/timeout.flow
Normal file
26
projects/flowmetal/examples/timeout.flow
Normal file
|
@ -0,0 +1,26 @@
|
|||
# -*- mode: python -*-
|
||||
|
||||
from datetime import timedelta
|
||||
from time import sleep
|
||||
from flowmetal import workflow, timeout, CancelledError, TimeoutError, Task
|
||||
|
||||
|
||||
def cancellable_activity():
|
||||
try:
|
||||
while True:
|
||||
print("Still alive")
|
||||
sleep(0.5)
|
||||
except CancelledError:
|
||||
print("Task killed")
|
||||
|
||||
|
||||
@workflow
|
||||
def main():
|
||||
# Somewhat like a thread
|
||||
t = Task(target=cancellable_activity, args=(), timeout=timedelta(minutes=5))
|
||||
t.start()
|
||||
try:
|
||||
result = t.result(timeout=timedelta(seconds=3))
|
||||
print(result)
|
||||
except TimeoutError:
|
||||
t.cancel()
|
|
@ -1,34 +0,0 @@
|
|||
from setuptools import setup
|
||||
|
||||
|
||||
setup(
|
||||
name="arrdem.flowmetal",
|
||||
# Package metadata
|
||||
version="0.0.0",
|
||||
license="MIT",
|
||||
description="A weird execution engine",
|
||||
long_description=open("README.md").read(),
|
||||
long_description_content_type="text/markdown",
|
||||
author="Reid 'arrdem' McKenzie",
|
||||
author_email="me@arrdem.com",
|
||||
url="https://git.arrdem.com/arrdem/flowmetal",
|
||||
classifiers=[
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Intended Audience :: Developers",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
],
|
||||
# Package setup
|
||||
package_dir={"": "src/python"},
|
||||
packages=[
|
||||
"flowmetal",
|
||||
],
|
||||
entry_points={
|
||||
"console_scripts": ["iflow=flowmetal.repl:main"],
|
||||
},
|
||||
install_requires=[
|
||||
"prompt-toolkit~=3.0.0",
|
||||
],
|
||||
extras_require={},
|
||||
)
|
|
@ -32,6 +32,6 @@ tasks:
|
|||
|
||||
bindings:
|
||||
ttl: 300 # 5min TTL on records
|
||||
local:
|
||||
public_v4s:
|
||||
- 67.166.27.157
|
||||
# local:
|
||||
# public_v4s:
|
||||
# - 174.51.247.120
|
||||
|
|
|
@ -19,6 +19,7 @@ buildcache {{ ttl }} IN A {{ link }}
|
|||
feed {{ ttl }} IN A {{ link }}
|
||||
ton {{ ttl }} IN A {{ link }}
|
||||
relay {{ ttl }} IN A {{ link }}
|
||||
pxe {{ ttl }} IN A {{ link }}
|
||||
{% endfor %}
|
||||
|
||||
# Host records
|
||||
|
|
|
@ -43,6 +43,7 @@ SHITLIST = [
|
|||
def req_name(requirement: str) -> str:
|
||||
requirement = requirement.lower()
|
||||
match = re.match(REQ_PATTERN, requirement)
|
||||
assert match is not None
|
||||
return match.group("pkgname") or match.group("eggname")
|
||||
|
||||
|
||||
|
|
44
tools/python/requirements.in
Normal file
44
tools/python/requirements.in
Normal file
|
@ -0,0 +1,44 @@
|
|||
aiohttp
|
||||
aiohttp_basicauth
|
||||
async_lru
|
||||
autoflake
|
||||
beautifulsoup4
|
||||
black
|
||||
cachetools
|
||||
click
|
||||
colored
|
||||
ExifRead
|
||||
flake8
|
||||
flask
|
||||
hypothesis
|
||||
icmplib
|
||||
isort
|
||||
jinja2
|
||||
lark
|
||||
livereload
|
||||
lxml
|
||||
markdown
|
||||
meraki
|
||||
octorest
|
||||
octorest
|
||||
openapi-spec-validator
|
||||
prompt-toolkit
|
||||
proquint
|
||||
psycopg2
|
||||
pycryptodome
|
||||
pyrsistent
|
||||
pytest-cov
|
||||
pytest-postgresql
|
||||
pyyaml
|
||||
recommonmark
|
||||
redis
|
||||
requests
|
||||
requests
|
||||
retry
|
||||
smbus2
|
||||
sphinx
|
||||
sphinxcontrib-openapi
|
||||
sphinxcontrib-programoutput
|
||||
unify
|
||||
yamllint
|
||||
yaspin
|
|
@ -1,146 +1,155 @@
|
|||
aiohttp==3.8.1
|
||||
aiohttp==3.8.4
|
||||
aiohttp-basicauth==1.0.0
|
||||
aiosignal==1.2.0
|
||||
alabaster==0.7.12
|
||||
async-lru==1.0.3
|
||||
async-timeout==4.0.2
|
||||
attrs==21.4.0
|
||||
autoflake==1.4
|
||||
Babel==2.9.1
|
||||
attrs==22.1.0
|
||||
autoflake==2.0.1
|
||||
Babel==2.11.0
|
||||
bases==0.2.1
|
||||
beautifulsoup4==4.10.0
|
||||
black==21.8b0
|
||||
beautifulsoup4==4.11.2
|
||||
black==23.1.0
|
||||
blake3==0.3.1
|
||||
bleach==4.1.0
|
||||
borg==2012.4.1
|
||||
cachetools==5.2.0
|
||||
cachetools==5.3.0
|
||||
cbor2==5.4.3
|
||||
certifi==2021.10.8
|
||||
certifi==2022.9.24
|
||||
chardet==4.0.0
|
||||
charset-normalizer==2.0.10
|
||||
click==7.1.2
|
||||
colored==1.4.3
|
||||
charset-normalizer==2.1.1
|
||||
click==8.1.3
|
||||
colored==1.4.4
|
||||
commonmark==0.9.1
|
||||
coverage==6.2
|
||||
Cython==0.29.30
|
||||
dataclasses==0.6
|
||||
decorator==5.1.1
|
||||
deepmerge==1.1.0
|
||||
Deprecated==1.2.13
|
||||
docutils==0.17.1
|
||||
ExifRead==2.3.2
|
||||
flake8==4.0.1
|
||||
Flask==2.0.2
|
||||
frozenlist==1.2.0
|
||||
docutils==0.19
|
||||
exceptiongroup==1.1.0
|
||||
ExifRead==3.0.0
|
||||
flake8==6.0.0
|
||||
Flask==2.2.3
|
||||
frozenlist==1.3.3
|
||||
graphviz==0.19.1
|
||||
hypothesis==6.35.0
|
||||
icmplib==3.0.2
|
||||
idna==3.3
|
||||
imagesize==1.3.0
|
||||
hypothesis==6.68.1
|
||||
icmplib==3.0.3
|
||||
idna==3.4
|
||||
imagesize==1.4.1
|
||||
importlib-metadata==4.10.0
|
||||
iniconfig==1.1.1
|
||||
isodate==0.6.1
|
||||
isort==5.10.1
|
||||
isort==5.12.0
|
||||
itsdangerous==2.0.1
|
||||
jedi==0.18.1
|
||||
Jinja2==3.0.3
|
||||
Jinja2==3.1.2
|
||||
jsonschema==4.3.3
|
||||
lark==1.0.0
|
||||
jsonschema-spec==0.1.3
|
||||
lark==1.1.5
|
||||
lazy-object-proxy==1.9.0
|
||||
livereload==2.6.3
|
||||
lxml==4.7.1
|
||||
lxml==4.9.2
|
||||
m2r==0.2.1
|
||||
Markdown==3.3.6
|
||||
MarkupSafe==2.0.1
|
||||
mccabe==0.6.1
|
||||
meraki==1.24.0
|
||||
Markdown==3.4.1
|
||||
MarkupSafe==2.1.1
|
||||
mccabe==0.7.0
|
||||
meraki==1.27.0
|
||||
mirakuru==2.4.1
|
||||
mistune==2.0.1
|
||||
mmh3==3.0.0
|
||||
multidict==5.2.0
|
||||
multidict==6.0.2
|
||||
multiformats==0.1.4.post3
|
||||
mypy-extensions==0.4.3
|
||||
numpy==1.23.1
|
||||
numpy==1.23.5
|
||||
octorest==0.4
|
||||
openapi-schema-validator==0.2.0
|
||||
openapi-spec-validator==0.3.1
|
||||
packaging==21.3
|
||||
openapi-schema-validator==0.4.3
|
||||
openapi-spec-validator==0.5.5
|
||||
packaging==23.0
|
||||
parso==0.8.3
|
||||
pathable==0.4.3
|
||||
pathspec==0.9.0
|
||||
pep517==0.12.0
|
||||
pip==21.3.1
|
||||
pep517==0.13.0
|
||||
picobox==2.2.0
|
||||
pip==22.3.1
|
||||
pip-tools==6.4.0
|
||||
plac==1.3.5
|
||||
platformdirs==2.4.1
|
||||
pluggy==1.0.0
|
||||
port-for==0.6.1
|
||||
prompt-toolkit==3.0.24
|
||||
psutil==5.9.0
|
||||
psycopg2==2.9.3
|
||||
prompt-toolkit==3.0.36
|
||||
proquint==0.2.1
|
||||
psutil==5.9.4
|
||||
psycopg2==2.9.5
|
||||
pudb==2022.1
|
||||
pur==5.4.2
|
||||
py==1.11.0
|
||||
pycodestyle==2.8.0
|
||||
pycryptodome==3.15.0
|
||||
pycodestyle==2.10.0
|
||||
pycryptodome==3.17
|
||||
pycryptodomex==3.15.0
|
||||
pyflakes==2.4.0
|
||||
Pygments==2.11.2
|
||||
pyparsing==3.0.6
|
||||
pyrsistent==0.18.1
|
||||
pyflakes==3.0.1
|
||||
Pygments==2.13.0
|
||||
pyparsing==3.0.9
|
||||
pyrsistent==0.19.3
|
||||
pysha3==1.0.2
|
||||
pyskein==1.0
|
||||
pytest==6.2.5
|
||||
pytest-cov==3.0.0
|
||||
pytest-postgresql==4.1.0
|
||||
pytest-cov==4.0.0
|
||||
pytest-postgresql==4.1.1
|
||||
pytest-pudb==0.7.0
|
||||
pytest-timeout==2.1.0
|
||||
pytz==2021.3
|
||||
pytz==2022.6
|
||||
PyYAML==6.0
|
||||
readme-renderer==32.0
|
||||
recommonmark==0.7.1
|
||||
redis==4.1.0
|
||||
redis==4.5.1
|
||||
regex==2021.11.10
|
||||
requests==2.27.1
|
||||
requests==2.28.2
|
||||
requests-toolbelt==0.9.1
|
||||
requirements-parser==0.3.1
|
||||
retry==0.9.2
|
||||
rfc3339-validator==0.1.4
|
||||
scipy==1.8.1
|
||||
setuptools==60.5.0
|
||||
six==1.16.0
|
||||
smbus2==0.4.1
|
||||
smbus2==0.4.2
|
||||
snowballstemmer==2.2.0
|
||||
sortedcontainers==2.4.0
|
||||
soupsieve==2.3.1
|
||||
Sphinx==4.3.2
|
||||
Sphinx==6.1.3
|
||||
sphinx_mdinclude==0.5.3
|
||||
sphinxcontrib-applehelp==1.0.2
|
||||
sphinxcontrib-devhelp==1.0.2
|
||||
sphinxcontrib-htmlhelp==2.0.0
|
||||
sphinxcontrib-httpdomain==1.8.0
|
||||
sphinxcontrib-jsmath==1.0.1
|
||||
sphinxcontrib-openapi==0.7.0
|
||||
sphinxcontrib-openapi==0.8.1
|
||||
sphinxcontrib-programoutput==0.17
|
||||
sphinxcontrib-qthelp==1.0.3
|
||||
sphinxcontrib-serializinghtml==1.1.5
|
||||
termcolor==1.1.0
|
||||
termcolor==2.2.0
|
||||
toml==0.10.2
|
||||
tomli==1.2.3
|
||||
tomli==2.0.1
|
||||
toposort==1.7
|
||||
tornado==6.1
|
||||
typed-ast==1.5.1
|
||||
types-setuptools==57.4.7
|
||||
typing_extensions==4.0.1
|
||||
typing-validation==0.0.1.post7
|
||||
typing_extensions==4.4.0
|
||||
unify==0.5
|
||||
untokenize==0.1.1
|
||||
urllib3==1.26.8
|
||||
urllib3==1.26.13
|
||||
urwid==2.1.2
|
||||
urwid-readline==0.13
|
||||
wasm==1.2
|
||||
wcwidth==0.2.5
|
||||
webencodings==0.5.1
|
||||
websocket-client==1.2.3
|
||||
Werkzeug==2.0.2
|
||||
Werkzeug==2.2.3
|
||||
wheel==0.37.1
|
||||
wrapt==1.13.3
|
||||
yamllint==1.26.3
|
||||
yarl==1.7.2
|
||||
yaspin==2.1.0
|
||||
yamllint==1.29.0
|
||||
yarl==1.8.1
|
||||
yaspin==2.3.0
|
||||
zipp==3.7.0
|
||||
|
|
Loading…
Reference in a new issue