From 87539621e54439c3b2b7441d6f1cc91fadece356 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Tue, 7 Mar 2023 18:55:25 -0700 Subject: [PATCH] Refreshing --- README.md | 67 +- doc/NOTES.md | 251 +++++ doc/architecture.md | 53 + doc/call_cc_airflow.md | 154 +++ doc/what_problem.md | 47 + examples/abc.flow | 29 + examples/timeout.flow | 26 + scratch/astdump.py | 98 ++ scratch/astinterp.py | 977 ++++++++++++++++++ scratch/test.py | 34 + setup.py | 37 - src/flowmetal/com.twitter.wilson.flow | 83 -- src/flowmetal/flowmetal.time.flow | 33 - src/python/flowmetal/__init__.py | 1 - src/python/flowmetal/__main__.py | 26 + src/python/flowmetal/db/base.py | 28 + src/python/flowmetal/db/redis.py | 3 + src/python/flowmetal/frontend.py | 9 + src/python/flowmetal/interpreter.py | 9 + src/python/flowmetal/models.py | 3 + src/python/flowmetal/parser.py | 511 --------- src/python/flowmetal/reaper.py | 9 + src/python/flowmetal/repl.py | 78 -- src/python/flowmetal/scheduler.py | 9 + src/python/flowmetal/syntax_analyzer.py | 356 ------- test/python/flowmetal/test_parser.py | 161 --- test/python/flowmetal/test_syntax_analyzer.py | 50 - 27 files changed, 1774 insertions(+), 1368 deletions(-) create mode 100644 doc/NOTES.md create mode 100644 doc/architecture.md create mode 100644 doc/call_cc_airflow.md create mode 100644 doc/what_problem.md create mode 100644 examples/abc.flow create mode 100644 examples/timeout.flow create mode 100644 scratch/astdump.py create mode 100644 scratch/astinterp.py create mode 100644 scratch/test.py delete mode 100644 setup.py delete mode 100644 src/flowmetal/com.twitter.wilson.flow delete mode 100644 src/flowmetal/flowmetal.time.flow delete mode 100644 src/python/flowmetal/__init__.py create mode 100644 src/python/flowmetal/__main__.py create mode 100644 src/python/flowmetal/db/base.py create mode 100644 src/python/flowmetal/db/redis.py create mode 100644 src/python/flowmetal/frontend.py create mode 100644 src/python/flowmetal/interpreter.py create mode 100644 src/python/flowmetal/models.py delete mode 100644 src/python/flowmetal/parser.py create mode 100644 src/python/flowmetal/reaper.py delete mode 100644 src/python/flowmetal/repl.py create mode 100644 src/python/flowmetal/scheduler.py delete mode 100644 src/python/flowmetal/syntax_analyzer.py delete mode 100644 test/python/flowmetal/test_parser.py delete mode 100644 test/python/flowmetal/test_syntax_analyzer.py diff --git a/README.md b/README.md index 5419a47..8c6be28 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # Flowmetal -> A shining mercurial metal laden with sensors and almost infinitely reconfigurable. -> +> A shining mercurial metal, laden with sensors and almost infinitely reconfigurable. > The stuff of which robots and servitors are made. Flowmetal is a substrate for automation. -It attempts to provide a programming environment wherein programs are durable, evented and asynchronous aimed at what would traditionally be described as scripting or coordination. +It provides a programming environment wherein programs are durable, evented and asynchronous by default. +It is aimed at scripting or coordination tasks, such as workflows and scheduled jobs. Let's unpack these terms. @@ -21,65 +21,16 @@ This also allows for external systems such as REST callback APIs, databases and It also allows bidirectional communication between Flowmetal programs and other more traditional programming environments. Anything that can communicate with Flowmetal can provide function implementations, or call Flowmetal programs! +This centering of evented communication makes Flowmetal ideal for **coordination** tasks, from simple task sequencing to map/reduce and other parallelism patterns. + **Asynchronous** - thanks to Flowmetal's evented execution model, waiting for slow external events either synchronously or asynchronously is second nature! -Flowmetal is especially good at waiting for very, very slow external operations. -Stuff like webhooks and batch processes. -**Scripting** - the tradeoff Flowmetal makes for the evented model is that it's slow. -While Flowmetal foreign functions could be fast, Flowmetal's interpreter isn't designed for speed. -It's designed for eventing and ensuring durability. -This makes Flowmetal suitable for interacting with and coordinating other systems, but it's not gonna win any benchmark games. +**Scripting** - Durablity and distribution of execution come at coordination costs which make Flowmetal well suited for coordination tasks, but not for heavy processing. -## Wait what? - -Okay. -In simpler words, Flowmetal is an interpreted lisp which can use a datastore of your choice for durability. -Other systems can attach to Flowmetal's datastore and send events to and receive them from Flowmetal. -For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector. - -A possible Flowmetal setup looks something like this - - -``` - +----------------------------+ - +---------------------------+ | - +--------------------------+ |--+ - | External HTTP service(s) |--+ - +--------------------------+ - ^ ^ - | | - v v - +-----------------------+ +------------------------+ - | HTTP server connector | | HTTP request connector | - +-----------------------+ +------------------------+ - ^ ^ - | | - v v - +--------------------+ - | Shared event store | - +--------------------+ - ^ - | - v - +--------------------------+ - | Flowmetal interpreter(s) | - +--------------------------+ -``` - -In this setup, the Flowmetal interpreters are able to interact with an external HTTP service; sending and receiving webhooks with Flowmetal programs waiting for those external events to arrive. - -For instance this program would use the external connector stubs to build up interaction(s) with an external system. - -```lisp - - - -``` - - -Comparisons to Apache Airflow are at least in this setup pretty apt, although Flowmetal's durable execution model makes it much more suitable for providing reliable workflows and its DSL is more approachable. +- For a problem statement, see [Call/CC Airflow](docs/call_cc_airflow.md). +- For an architecture overview, see [Architecture](docs/architecture.md). +- For example doodles, see [examples](examples). ## License -Mirrored from https://git.arrdem.com/arrdem/flowmetal - Published under the MIT license. See [LICENSE.md](LICENSE.md) diff --git a/doc/NOTES.md b/doc/NOTES.md new file mode 100644 index 0000000..3fec0c7 --- /dev/null +++ b/doc/NOTES.md @@ -0,0 +1,251 @@ +# Notes + +https://github.com/Pyrlang/Pyrlang +https://en.wikipedia.org/wiki/Single_system_image + +## Example - Await + +A common pattern working in distributed environments is to want to request another system perform a job and wait for its results. +There are lots of parallels here to making a function or RPC call, except that it's a distributed system with complex failure modes. + +In a perfect world we'd want to just write something like this - + +```python +#!/usr/bin/env python3.10 + +from service.client import Client + +CLIENT = Client("http://service.local", api_key="...") +job = client.create_job(...) +result = await job +# Do something with the result +``` + +There's some room for variance here around API design taste, but this snippet is probably familiar to many Python readers. +Let's think about its failure modes. + +First, that `await` is doing a lot of heavy lifting. +Presumably it's wrapping up a polling loop of some sort. +That may be acceptable in some circumstances, but it really leaves to the client library implementer the question of what an acceptable retry policy is. + +Second, this snippet assumes that `create_job` will succeed. +There won't be an authorization error, or a network transit error, or a remote server error or anything like that. + +Third, there's no other record of whatever `job` is. +If the Python interpreter running this program dies, or the user gets bored and `C-c`'s it or the computer encounters a problem, the job will be lost. +Maybe that's OK, maybe it isn't. +But it's a risk. + +Now, let's think about taking on some of the complexity needed to solve these problems ourselves. + +### Retrying challenges + +We can manually write the retry loop polling a remote API. + +``` python +#!/usr/bin/env python3.10 + +from datetime import datetime, timedelta + +from service.client import Client + + +CLIENT = Client("http://service.local", api_key="...") +AWAIT_TIMEOUT = timedelta(minutes=30) +POLL_TIME = timedelta(seconds=10) + + +def sleep(duration=POLL_TIME): + """A slightly more useful sleep. Has our default and does coercion.""" + from time import sleep + if isinstance(duration, timedelta): + duration = duration.total_seconds() + sleep(duration) + + +# Create a job, assuming idempotence +while True: + try: + job = client.create_job(...) + start_time = datetime.now() + break + except: + sleep() + +# Waiting for the job +while True: + # Time-based timeout + if datetime.now() - start_time > AWAIT_TIMEOUT: + raise TimeoutError + + # Checking the job status, no backoff linear polling + try: + if not job.complete(): + continue + except: + sleep() + continue + + # Trying to read the job result, re-using the retry loop & total timeout machinery + try: + result = job.get() + break + except: + sleep() + continue + +# Do something with the result +``` + +We could pull [retrying](https://pypi.org/project/retrying/) off the shelf and get some real mileage here. +`retrying` is a super handy little library that provides the `@retry` decorator, which implements a variety of common retrying concerns such as retrying N times with linear or exponential back-off, and such. +It's really just the `while/try/except` state machine we just wrote a couple times as a decorator. + +``` python +#!/usr/bin/env python3.10 + +from datetime import datetime, timedelta + +from retrying import retry + +from service.client import Client + + +CLIENT = Client("http://service.local", api_key="...") +AWAIT_TIMEOUT = timedelta(minutes=30) +POLL_TIME = timedelta(seconds=10) + + +class StillWaitingException(Exception): + """Something we can throw to signal we're still waiting on an external event.""" + + +@retry(wait_fixed=POLL_TIME.total_milliseconds()) +def r_create_job(client): + """R[eliable] create job. Retries over exceptions forever with a delay. No jitter.""" + return client.create_job() + + +@retry(stop_max_delay=AWAIT_TIMEOUT.total_milliseconds(), + wait_fixed=POLL_TIME.total_milliseconds()) +def r_get_job(job): + """R[eliable] get job. Retries over exceptions up to a total time with a delay. No jitter.""" + if not job.complete(): + raise StillWaitingException + + return job.get() + + +job = r_create_job(client) +result = r_get_job(job) +# Do something with the result +``` + +That's pretty good! +We've preserved most of our direct control over the mechanical retrying behavior, we can tweak it or choose a different provider. +And we've managed to get the syntactic density of the original `await` example back ... almost. + +This is where Python's lack of an anonymous function block syntax and other lexical structures becomes a sharp limiter. +In another language like Javascript or LUA, you could probably get this down to something like - + +``` lua +-- retry is a function of retrying options to a function of a callable to retry +-- which returns a zero-argument callable which will execute the callable with +-- the retrying behavior as specified. + +client = Client("http://service.local", api_key="...") +retry_config = {} -- Fake, obviously +with_retry = retry(retry_config) + +job = with_retry( + funtion () + return client.start_plan(...) + end)() + +result = with_retry( + function() + if job.complete() then + return job.get() + end + end)() +``` + +The insight here is that the "callback" function we're defining in the Python example as `r_get_job` and so forth has no intrinsic need to be named. +In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader. +Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes. + +Using [PEP-0342](https://www.python.org/dev/peps/pep-0342/#new-generator-method-send-value), it's possible to implement arbitrary coroutines in Python by `.send()`ing values to generators which may treat `yield` statements as rvalues for receiving remotely sent inputs. +This makes it possible to explicitly yield control to a remote interpreter, which will return or resume the couroutine with a result value. + +Microsoft's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=python) use exactly this behavior to implement durable functions. +The "functions" provided by the API return sentinels which can be yielded to an external interpreter, which triggers processing and returns control when there are results. +This is [interpreter effect conversion pattern (Extensible Effects)](http://okmij.org/ftp/Haskell/extensible/exteff.pdf) as seen in Haskell and other tools; applied. + + +``` python +import azure.functions as func +import azure.durable_functions as df + +def orchestrator_function(context: df.DurableOrchestrationContext): + x = yield context.call_activity("F1", None) + y = yield context.call_activity("F2", x) + z = yield context.call_activity("F3", y) + result = yield context.call_activity("F4", z) + return result + +main = df.Orchestrator.create(orchestrator_function) +``` + +Now it would seem that you could "just" automate doing rewriting that to something like this - + +``` python +@df.Durable +def main(ctx): + x = context.call_activity("F1", None) + y = context.call_activity("F2", x) + z = context.call_activity("F3", y) + return context.call_activity("F4", z) +``` + +There's some prior art for doing this (https://eigenfoo.xyz/manipulating-python-asts/, https://greentreesnakes.readthedocs.io/en/latest/manipulating.html#modifying-the-tree) but it's a lot of legwork for not much. +There are also some pretty gaping correctness holes in taking the decorator based rewriting approach; +how do you deal with rewriting imported code, or code that's in classes/behind `@property` and other such tricks? + +Just not worth it. + +Now, what we _can_ do is try to hijack the entire Python interpreter to implement the properties/tracing/history recording we want there. +The default cpython lacks hooks for doing this, but we can write a python-in-python interpreter and "lift" the user's program into an interpreter we control, which ultimately gets most of its behavior "for free" from the underlying cpython interpreter. +There's [an example](https://github.com/pfalcon/pyastinterp) of doing this as part of the pycopy project; although there it's more of a Scheme-style proof of metacircular self-hosting. + +There's a modified copy of the astinterp in `scratch/` which is capable of running a considerable subset of py2/3.9 to the point of being able to source-import many libraries including `requests` and run PyPi sourced library code along with user code under hoisted interpretation. + +It doesn't support coroutines/generators yet, and there's some machinery required to make it "safe" (meaningfully single-stepable; "fix"/support eval, enable user-defined import/`__import__` through the lifted python VM) but as a proof of concept of a lifted VM I'm genuinely shocked how well this works. + +Next questions here revolve around how to "snapshot" the state of the interpreter meaningfully, and how to build a replayable interpreter log. +There are some specific challenges around how Python code interacts with native C code that could limit the viability of this approach, but at the absolute least this fully sandboxed Python interpreter could be used to implement whatever underlying magic could be desired and restricted to some language subset as desired. + +The goal is to make something like this work - + +``` python +from df import Activity + +f1 = Activity("F1") +f2 = Activity("F2") +f3 = Activity("F3") +f4 = Activity("F4") + +def main(): + return f4(f3(f2(f1(None)))) +``` + +Which may offer a possible solution to the interpreter checkpointing problem - only checkpoint "supported" operations. +Here the `Activity().__call__` operation would have special support, as with `datetime.datetime.now()` and controlling `time.sleep()`, threading and possibly `random.Random` seeding which cannot trivially be made repeatable. + +### Durability challenges + +FIXME - manually implementing snapshotting and recovery is hard + + +### Leverage with language support + +FIXME - What does a DSL that helps with all this look like? diff --git a/doc/architecture.md b/doc/architecture.md new file mode 100644 index 0000000..1ace5f1 --- /dev/null +++ b/doc/architecture.md @@ -0,0 +1,53 @@ +# Architecture + +Flowmetal is an interpreted language backed by a durable event store. +The execution history of a program persists to the durable store as execution precedes. +If an interpretation step fails to persist, it can't have external effects. +This is the fundamental insight behind Microsoft AMBROSIA. +The event store also provides Flowmetal's only interface for communicating with external systems. +Other systems can attach to Flowmetal's data store and send events to and receive them from Flowmetal. +For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector. +This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks. + +A Flowmetal deplyoment looks like this - + +``` + +----------------------------+ + +---------------------------+ | + +--------------------------+ |--+ + | External HTTP service(s) |--+ + +--------------------------+ + ^ ^ + | | + v v + +-----------------------+ +------------------------+ + | HTTP server connector | | HTTP request connector | + +-----------------------+ +------------------------+ + ^ ^ + | | + v v + +--------------------+ +----------------------+ + | Shared event store | | Shared program store | + +--------------------+ +----------------------+ + ^ ^ + | | + v v + +--------------------------+ + | Flowmetal interpreter(s) | + +--------------------------+ +``` + +Users interact with Flowmetal by creating (or editing) **Programs**. + +An instance of a Program is called a **Task**. +Every Task has a unique **Inbox** and **Outbox**. +Comparable systems call the unit of execution a Process; we prefer Task because Process invites conflation with a Unix process or thread and our Tasks are entirely portable. + +Tasks interact with the outside world by producing **Requests** into their Outbox. +For example an instance of a Task could request that some other Task be executed. +Delivering messages to some other Task, or making API calls against external services would be other good examples of Requests. + +Tasks receive the results of their Requests and other external events as **Messages** in their **Inbox**. +A request that some other Task be executed would be responded to with a Message identifying the other task. +The requesting Task could choose to wait on the requested Task, or could disregard it. +Likewise the results of external requests return as Messages. diff --git a/doc/call_cc_airflow.md b/doc/call_cc_airflow.md new file mode 100644 index 0000000..c668644 --- /dev/null +++ b/doc/call_cc_airflow.md @@ -0,0 +1,154 @@ +# What problem are you trying to solve? + +In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap. + +Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.) and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation. +This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand. + +In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern. +Traditional automation tools such as shells struggle with this task because they make it difficult to implement features such as error handling. + +Furthermore, in a distributed environment it cannot be assumed that a single machine can remain available to execute automation. +Automation authors are thus confronted not just with the need to use "real" programming languages but the need to implement a database backed state machine which can "checkpoint" the job. + +Taking a step back, this is an enormous failure of the languages we have available to describe workflow tasks. +That users need to write state machines that define state machines that actually perform the desired task shows that the available tools operate at the wrong level. + +Airflow for instance succeeds at providing a "workflow" control flow graph abstraction which frees users of the concerns of implementing their own resumable state machines. + +Consider this example from the Airflow documentation - + +```python +from __future__ import annotations + +import pendulum + +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from airflow.utils.edgemodifier import Label + +with DAG( + "example_branch_labels", + schedule="@daily", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, +) as dag: + ingest = EmptyOperator(task_id="ingest") + analyse = EmptyOperator(task_id="analyze") + check = EmptyOperator(task_id="check_integrity") + describe = EmptyOperator(task_id="describe_integrity") + error = EmptyOperator(task_id="email_error") + save = EmptyOperator(task_id="save") + report = EmptyOperator(task_id="report") + + ingest >> analyse >> check + check >> Label("No errors") >> save >> report + check >> Label("Errors found") >> describe >> error >> report +``` + +Compared to handwriting out all the nodes in the control flow graph and the requisite checkpointing to the database on state transitions, this is a considerable improvement. +But if you stop and look at this control flow graph, it's expressing a simple branching operation for which we have much more conventional notation. + +Consider the same workflow using "normal" Python syntax rather than the embedded workflow syntax - + +```python +def noop(): + pass + +def check(result): + return True + +ingest = noop +analyse = noop +describe = noop +error = noop +save = noop +report = noop + +def main(): + ingest() + result = analyse() + if check(result): + save(result) + report(result) + else: + describe() + error() + report() +``` + +The program a developer authors is already a control flow graph. +We have a name for "operators" or state nodes - they're just functions. +We have a name and notation for transitions in the state chart - they're just sequential statements. + +Anything we need developers to write above that baseline represents friction specific to a workflow task which we should seek to minimize. + +Temporal does better than Airflow and reflects this understanding. +Using insights from Azure Durable Functions, their SDK leverages the details of Python's `async` and `await` operators to hijack program control flow and implement workflow features under the covers. + +Consider this example from the Temporal documentation - + +```python +@activity.defn +async def cancellable_activity(input: ComposeArgsInput) -> NoReturn: + try: + while True: + print("Heartbeating cancel activity") + await asyncio.sleep(0.5) + activity.heartbeat("some details") + except asyncio.CancelledError: + print("Activity cancelled") + raise + + +@workflow.defn +class GreetingWorkflow: + @workflow.run + async def run(self, input: ComposeArgsInput) -> None: + activity_handle = workflow.start_activity( + cancel_activity, + ComposeArgsInput(input.arg1, input.arg2), + start_to_close_timeout=timedelta(minutes=5), + heartbeat_timeout=timedelta(seconds=30), + ) + + await asyncio.sleep(3) + activity_handle.cancel() +``` + +This is really good compared to an equivalent Airflow graph! +All the details are "normal" Python, and the SDK fits "natively" into how Python execution occurs. +But it's still laden with syntax such as the `async` function coloring and decorators which serve only to support the workflow SDK. + +In comparison were this workflow a "simple" Python script it would only need to be written + +```python +# https://pypi.org/project/timeoutcontext/ +from timeoutcontext import task_with_timeout, TimeoutError + + +def cancellable_activity(): + try: + while True: + print("Heartbeating cancellable activity") + sleep(0.5) + except TimeoutError: + print("Activity cancelled") + raise + + +def main(): + task = task_with_timeout(lambda: cancellable_activity(), + timeout=timedelta(minutes=5)) + sleep(3) + task.cancel() +``` + +As with Airflow, the Temporal SDK effectively requires that the programmer learn not just a set of libraries but the Python `async` features because the implementation of the workflow engine is leaked to the users. +The problem isn't just the excessive syntax, it's that as with Airflow user workflows are no longer "normal" programs. +There is in effect an entire Temporal interpreter stacked inbetween the Python runtime with which users are familiar and the user's program. +It is in effect a new language with none of the notational advantages of being one. + +The flipside is that there is an enormous advantage in tooling to be had by leveraging an existing language - or something that looks enough like one - rather than inventing a new notation. +This is the cardinal sin of workflow tools like kubeflow and various CI "workflow" formats - they adopt unique YAML or XML based notations which have no tooling support. +For instance by being "normal" (ish) Python, the Temporal SDK benefits from access to editor autocompletion, the MyPy typechecker and all manner of other tools. diff --git a/doc/what_problem.md b/doc/what_problem.md new file mode 100644 index 0000000..8b5bc3f --- /dev/null +++ b/doc/what_problem.md @@ -0,0 +1,47 @@ +# An Asynchronous, Distributed Task Engine + +This document presents a design without reference implementation for a distributed programming system; +sometimes called a workflow engine. +It is intended to provide architectural level clarity allowing for the development of alternative designs or implementations as may suit. + +## Problem Statement + +In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap. + +Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.) +and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation. +This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand. + +In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern. +Traditional automation tools such as shells are inadequate for this environment because achieving network communication is excessively difficult. + +In a distributed environment it cannot be assumed that a single machine can remain available to execute automation; +This requires an approach to automation which allows for the incremental execution of single tasks at a time with provisions for relocation and recovery should failure occur. + +It also cannot be assumed that a single machine is sufficiently available to receive and process incoming events such as callbacks. +A distributed system is needed to wrangle distributed systems. + +## Design Considerations + +- Timeouts are everywhere +- Sub-Turing/boundable +- + +## Architectural Overview + +### Events +Things that will happen, or time out. + +### Actions +Things the workflow will do, or time out. + +### Bindings +Data the workflow either was given or computed. + +### Conditionals +Decisions the workflow may make. + +### Functions +A convenient way to talk about fragments of control flow graph. + +### Tracing & Reporting diff --git a/examples/abc.flow b/examples/abc.flow new file mode 100644 index 0000000..b166056 --- /dev/null +++ b/examples/abc.flow @@ -0,0 +1,29 @@ +# -*- mode: python -*- + +from flowmetal import workflow + + +def ingest(): + return {} + + +def analyze(data): + return data.keys() + + +def check(keys) -> bool: + return len(keys) > 0 + + +def report(keys): + print(keys) + + +@workflow +def main(): + data = ingest() + data = analyze(data) + if check(data): + report(data) + else: + raise ValueError(report(data)) diff --git a/examples/timeout.flow b/examples/timeout.flow new file mode 100644 index 0000000..87844f2 --- /dev/null +++ b/examples/timeout.flow @@ -0,0 +1,26 @@ +# -*- mode: python -*- + +from datetime import timedelta +from time import sleep +from flowmetal import workflow, timeout, CancelledError, TimeoutError, Task + + +def cancellable_activity(): + try: + while True: + print("Still alive") + sleep(0.5) + except CancelledError: + print("Task killed") + + +@workflow +def main(): + # Somewhat like a thread + t = Task(target=cancellable_activity, args=(), timeout=timedelta(minutes=5)) + t.start() + try: + result = t.result(timeout=timedelta(seconds=3)) + print(result) + except TimeoutError: + t.cancel() diff --git a/scratch/astdump.py b/scratch/astdump.py new file mode 100644 index 0000000..ed71dcc --- /dev/null +++ b/scratch/astdump.py @@ -0,0 +1,98 @@ +""" +A (toy) tool for emitting Python ASTs as YAML formatted data. +""" + +import ast +import optparse +import sys + +import yaml + + +def propnames(node): + """return names of attributes specific for the current node""" + + props = {x for x in dir(node) if not x.startswith("_")} + + if isinstance(node, ast.Module): + props -= {"body"} + + if isinstance(node, (ast.Expr, ast.Attribute)): + props -= {"value"} + + if isinstance(node, ast.Constant): + props -= {"n", "s"} + + if isinstance(node, ast.ClassDef): + props -= {"body"} + + return props + + +# Note that ast.NodeTransformer exists for mutations. +# This is just for reads. +class TreeDumper(ast.NodeVisitor): + def __init__(self): + super().__init__() + self._stack = [] + + def dump(self, node): + self.visit(node) + + def visit(self, node): + # nodetype = type(node) + nodename = node.__class__.__name__ + indent = " " * len(self._stack) * 2 + print(indent + nodename) + for n in propnames(node): + print(indent + "%s: %s" % (n, node.__dict__[n])) + + self._stack.append(node) + self.generic_visit(node) + self._stack.pop() + + +class YAMLTreeDumper(ast.NodeVisitor): + def __init__(self): + super().__init__() + self._stack = [] + + def node2yml(self, node): + try: + # nodetype = type(node) + nodename = node.__class__.__name__ + return { + "op": nodename, + "props": {n: node.__dict__[n] for n in propnames(node)}, + "children": [], + } + except Exception: + print(repr(node), propnames(node), dir(node)) + + def visit(self, node): + yml_node = self.node2yml(node) + self._stack.append(yml_node) + old_stack = self._stack + self._stack = yml_node["children"] + self.generic_visit(node) + self._stack = old_stack + return yml_node + + +if __name__ == "__main__": + parser = optparse.OptionParser(usage="%prog [options] ") + opts, args = parser.parse_args() + + if len(args) == 0: + parser.print_help() + sys.exit(-1) + filename = args[0] + + with open(filename) as f: + root = ast.parse(f.read(), filename) + + print( + yaml.dump( + YAMLTreeDumper().visit(root), default_flow_style=False, sort_keys=False + ) + ) diff --git a/scratch/astinterp.py b/scratch/astinterp.py new file mode 100644 index 0000000..ede73b1 --- /dev/null +++ b/scratch/astinterp.py @@ -0,0 +1,977 @@ +# flake8: noqa: all + +# Python AST interpreter written in Python +# +# This module is part of the Pycopy https://github.com/pfalcon/pycopy +# project. +# +# Copyright (c) 2019 Paul Sokolovsky +# +# The MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# Modified by Reid D. 'ardem' Mckenzie in 2021 to be a bit more fully-featured +# and usable for running 'real' code as part of an experiment in implementing a +# durable Python interpreter atop the original pycopy substrate. + +import ast +import logging +import os +import sys + + +if sys.version_info < (3, 0, 0): + builtins = __builtins__ +else: + import builtins + + +log = logging.getLogger(__name__) + + +class StrictNodeVisitor(ast.NodeVisitor): + def generic_visit(self, node): + n = node.__class__.__name__ + raise NotImplementedError("Visitor for node {} not implemented".format(n)) + + +class ANamespace: + def __init__(self, node): + self.d = {} + self.parent = None + # Cross-link namespace to AST node. Note that we can't do the + # opposite, because for one node, there can be different namespaces. + self.node = node + + def __getitem__(self, k): + return self.d[k] + + def get(self, k, default=None): + return self.d.get(k, default) + + def __setitem__(self, k, v): + self.d[k] = v + + def __delitem__(self, k): + del self.d[k] + + def __contains__(self, k): + return k in self.d + + def __str__(self): + return "<{} {}>".format(self.__class__.__name__, self.d) + + +class ModuleNS(ANamespace): + pass + + +class FunctionNS(ANamespace): + pass + + +class ClassNS(ANamespace): + pass + + +# Pycopy by default doesn't support direct slice construction, use helper +# object to construct it. +class SliceGetter: + def __getitem__(self, idx): + return idx + + +slice_getter = SliceGetter() + + +def arg_name(arg): + if sys.version_info < (3, 0, 0): + return arg.id + else: + return arg.arg + + +def kwarg_defaults(args): + if sys.version_info < (3, 0, 0): + return args.defaults + else: + return args.kw_defaults + + +class TargetNonlocalFlow(Exception): + """Base exception class to simulate non-local control flow transfers in + a target application.""" + + +class TargetBreak(TargetNonlocalFlow): + pass + + +class TargetContinue(TargetNonlocalFlow): + pass + + +class TargetReturn(TargetNonlocalFlow): + pass + + +class VarScopeSentinel: + def __init__(self, name): + self.name = name + + +NO_VAR = VarScopeSentinel("no_var") +GLOBAL = VarScopeSentinel("global") +NONLOCAL = VarScopeSentinel("nonlocal") + + +class InterpFuncWrap: + "Callable wrapper for AST functions (FunctionDef nodes)." + + def __init__(self, node, interp): + self.node = node + self.interp = interp + self.lexical_scope = interp.ns + + def __call__(self, *args, **kwargs): + return self.interp.call_func(self.node, self, *args, **kwargs) + + +# Python don't fully treat objects, even those defining __call__() special method, as a true callable. For example, such +# objects aren't automatically converted to bound methods if looked up as another object's attributes. As we want our +# "interpreted functions" to behave as close as possible to real functions, we just wrap function object with a real +# function. An alternative might have been to perform needed checks and explicitly bind a method using +# types.MethodType() in visit_Attribute (but then maybe there would be still other cases of "callable object" vs +# "function" discrepancies). +def InterpFunc(fun): + def func(*args, **kwargs): + return fun.__call__(*args, **kwargs) + + return func + + +class InterpWith: + def __init__(self, ctx): + self.ctx = ctx + + def __enter__(self): + return self.ctx.__enter__() + + def __exit__(self, tp, exc, tb): + # Don't leak meta-level exceptions into target + if isinstance(exc, TargetNonlocalFlow): + tp = exc = tb = None + return self.ctx.__exit__(tp, exc, tb) + + +class InterpModule: + def __init__(self, ns): + self.ns = ns + + def __getattr__(self, name): + try: + return self.ns[name] + except KeyError: + raise AttributeError + + def __dir__(self): + return list(self.ns.d.keys()) + + +class ModuleInterpreter(StrictNodeVisitor): + """An interpreter specific to a single module.""" + + def __init__(self, system, fname, node): + self.system = system + self.fname = fname + self.ns = self.module_ns = ModuleNS(node) + + # Call stack (in terms of function AST nodes). + self.call_stack = [] + + # To implement "store" operation, we need to arguments: location and value to store. The operation itself is + # handled by a node visitor (e.g. visit_Name), and location is represented by AST node, but there's no support + # to pass additional arguments to a visitor (likely, because it would be a burden to explicit pass such + # additional arguments thru the chain of visitors). So instead, we store this value as field. As interpretation + # happens sequentially, there's no risk that it will be overwritten "concurrently". + self.store_val = None + + # Current active exception, for bare "raise", which doesn't work across function boundaries (and that's how we + # have it - exception would be caught in visit_Try, while re-rasing would happen in visit_Raise). + self.cur_exc = [] + + def push_ns(self, new_ns): + new_ns.parent = self.ns + self.ns = new_ns + + def pop_ns(self): + self.ns = self.ns.parent + + def stmt_list_visit(self, lst): + res = None + for s in lst: + res = self.visit(s) + return res + + def wrap_decorators(self, obj, node): + for deco_n in reversed(list(node.decorator_list)): + deco = self.visit(deco_n) + obj = deco(obj) + return obj + + def visit(self, node): + val = super(StrictNodeVisitor, self).visit(node) + return val + + def visit_Module(self, node): + self.stmt_list_visit(node.body) + + def visit_Expression(self, node): + return self.visit(node.body) + + def visit_ClassDef(self, node): + self.push_ns(ClassNS(node)) + try: + self.stmt_list_visit(node.body) + except Exception: + self.pop_ns() + raise + ns = self.ns + self.pop_ns() + cls = type(node.name, tuple([self.visit(b) for b in node.bases]), ns.d) + cls = self.wrap_decorators(cls, node) + self.ns[node.name] = cls + # Store reference to class object in the namespace object + ns.cls = cls + + def visit_Lambda(self, node): + node.name = "" + return self.prepare_func(node) + + def visit_FunctionDef(self, node): + # Defaults are evaluated at function definition time, so we + # need to do that now. + func = self.prepare_func(node) + func = self.wrap_decorators(func, node) + self.ns[node.name] = func + + def prepare_func(self, node): + """Prepare function AST node for future interpretation: pre-calculate + and cache useful information, etc.""" + + func = InterpFuncWrap(node, self) + args = node.args or node.posonlyargs + num_required = len(args.args) - len(args.defaults) + all_args = set() + d = {} + for i, a in enumerate(args.args): + all_args.add(arg_name(a)) + if i >= num_required: + d[arg_name(a)] = self.visit(args.defaults[i - num_required]) + + for a, v in zip(getattr(args, "kwonlyargs", ()), kwarg_defaults(args)): + all_args.add(arg_name(a)) + if v is not None: + d[arg_name(a)] = self.visit(v) + # We can store cached argument names of a function in its node - + # it's static. + node.args.all_args = all_args + # We can't store the values of default arguments - they're dynamic, + # may depend on the lexical scope. + func.defaults_dict = d + + return InterpFunc(func) + + def prepare_func_args(self, node, interp_func, *args, **kwargs): + def arg_num_mismatch(): + raise TypeError( + "{}() takes {} positional arguments but {} were given".format( + node.name, len(argspec.args), len(args) + ) + ) + + argspec = node.args + # If there's vararg, either offload surplus of args to it, or init + # it to empty tuple (all in one statement). If no vararg, error on + # too many args. + # + # Note that we have to do the .posonlyargs dance + if argspec.vararg: + self.ns[argspec.vararg.arg] = args[len(argspec.args) :] + else: + if len(args) > len(argspec.args or getattr(argspec, "posonlyargs", ())): + arg_num_mismatch() + + if argspec.args: + for i in range(min(len(args), len(argspec.args))): + self.ns[arg_name(argspec.args[i])] = args[i] + elif getattr(argspec, "posonlyargs", ()): + if len(args) != len(argspec.posonlyargs): + arg_num_mismatch() + + for a, value in zip(argspec.posonlyargs, args): + self.ns[arg_name(a)] = value + + # Process incoming keyword arguments, putting them in namespace if + # actual arg exists by that name, or offload to function's kwarg + # if any. All make needed checks and error out. + func_kwarg = {} + for k, v in kwargs.items(): + if k in argspec.all_args: + if k in self.ns: + raise TypeError( + "{}() got multiple values for argument '{}'".format( + node.name, k + ) + ) + self.ns[k] = v + elif argspec.kwarg: + func_kwarg[k] = v + else: + raise TypeError( + "{}() got an unexpected keyword argument '{}'".format(node.name, k) + ) + if argspec.kwarg: + self.ns[arg_name(argspec.kwarg)] = func_kwarg + + # Finally, overlay default values for arguments not yet initialized. + # We need to do this last for "multiple values for the same arg" + # check to work. + for k, v in interp_func.defaults_dict.items(): + if k not in self.ns: + self.ns[k] = v + + # And now go thru and check for any missing arguments. + for a in argspec.args: + if arg_name(a) not in self.ns: + raise TypeError( + "{}() missing required positional argument: '{}'".format( + node.name, arg_name(a) + ) + ) + for a in getattr(argspec, "kwonlyargs", ()): + if a.arg not in self.ns: + raise TypeError( + "{}() missing required keyword-only argument: '{}'".format( + node.name, arg_name(a) + ) + ) + + def call_func(self, node, interp_func, *args, **kwargs): + self.call_stack.append(node) + # We need to switch from dynamic execution scope to lexical scope + # in which function was defined (then switch back on return). + dyna_scope = self.ns + self.ns = interp_func.lexical_scope + self.push_ns(FunctionNS(node)) + try: + self.prepare_func_args(node, interp_func, *args, **kwargs) + if isinstance(node.body, list): + res = self.stmt_list_visit(node.body) + else: + res = self.visit(node.body) + except TargetReturn as e: + res = e.args[0] + finally: + self.pop_ns() + self.ns = dyna_scope + self.call_stack.pop() + return res + + def visit_Return(self, node): + if not isinstance(self.ns, FunctionNS): + raise SyntaxError("'return' outside function") + raise TargetReturn(node.value and self.visit(node.value)) + + def visit_With(self, node): + assert len(node.items) == 1 + ctx = self.visit(node.items[0].context_expr) + with InterpWith(ctx) as val: + if node.items[0].optional_vars is not None: + self.handle_assign(node.items[0].optional_vars, val) + self.stmt_list_visit(node.body) + + def visit_Try(self, node): + try: + self.stmt_list_visit(node.body) + except TargetNonlocalFlow: + raise + except Exception as e: + self.cur_exc.append(e) + try: + for h in getattr(node, "handlers", ()): + if h.type is None or isinstance(e, self.visit(h.type)): + if h.name: + self.ns[h.name] = e + self.stmt_list_visit(h.body) + if h.name: + del self.ns[h.name] + break + else: + raise + finally: + self.cur_exc.pop() + else: + self.stmt_list_visit(node.orelse) + finally: + if getattr(node, "finalbody", None): + self.stmt_list_visit(node.finalbody) + + def visit_TryExcept(self, node): + # Py2k only; py3k merged all this into one node type. + return self.visit_Try(node) + + def visit_TryFinally(self, node): + # Py2k only; py3k merged all this into one node type. + return self.visit_Try(node) + + def visit_For(self, node): + iter = self.visit(node.iter) + for item in iter: + self.handle_assign(node.target, item) + try: + self.stmt_list_visit(node.body) + except TargetBreak: + break + except TargetContinue: + continue + else: + self.stmt_list_visit(node.orelse) + + def visit_While(self, node): + while self.visit(node.test): + try: + self.stmt_list_visit(node.body) + except TargetBreak: + break + except TargetContinue: + continue + else: + self.stmt_list_visit(node.orelse) + + def visit_Break(self, node): + raise TargetBreak + + def visit_Continue(self, node): + raise TargetContinue + + def visit_If(self, node): + test = self.visit(node.test) + if test: + self.stmt_list_visit(node.body) + else: + self.stmt_list_visit(node.orelse) + + def visit_Import(self, node): + for n in node.names: + self.ns[n.asname or n.name] = self.system.handle_import(n.name) + + def visit_ImportFrom(self, node): + mod = self.system.handle_import( + node.module, None, None, [n.name for n in node.names], node.level + ) + for n in node.names: + if n.name == "*": + # This is the special case of the wildcard import. Copy + # everything over. + for n in getattr(mod, "__all__", dir(mod)): + self.ns[n] = getattr(mod, n) + else: + self.ns[n.asname or n.name] = getattr(mod, n.name) + + def visit_Raise(self, node): + if node.exc is None: + if not self.cur_exc: + raise RuntimeError("No active exception to reraise") + raise self.cur_exc[-1] + elif node.cause is None: + raise self.visit(node.exc) + # else: + # raise self.visit(node.exc) from self.visit(node.cause) + + def visit_AugAssign(self, node): + assert isinstance(node.target.ctx, ast.Store) + # Not functional style, oops. Node in AST has store context, but we + # need to read its value first. To not construct a copy of the entire + # node with load context, we temporarily patch it in-place. + save_ctx = node.target.ctx + node.target.ctx = ast.Load() + var_val = self.visit(node.target) + node.target.ctx = save_ctx + + rval = self.visit(node.value) + + # As augmented assignment is statement, not operator, we can't put them + # all into map. We could instead directly lookup special inplace methods + # (__iadd__ and friends) and use them, with a fallback to normal binary + # operations, but from the point of view of this interpreter, presence + # of such methods is an implementation detail of the object system, it's + # not concerned with it. + op = type(node.op) + if op is ast.Add: + var_val += rval + elif op is ast.Sub: + var_val -= rval + elif op is ast.Mult: + var_val *= rval + elif op is ast.Div: + var_val /= rval + elif op is ast.FloorDiv: + var_val //= rval + elif op is ast.Mod: + var_val %= rval + elif op is ast.Pow: + var_val **= rval + elif op is ast.LShift: + var_val <<= rval + elif op is ast.RShift: + var_val >>= rval + elif op is ast.BitAnd: + var_val &= rval + elif op is ast.BitOr: + var_val |= rval + elif op is ast.BitXor: + var_val ^= rval + else: + raise NotImplementedError + + self.store_val = var_val + self.visit(node.target) + + def visit_Assign(self, node): + val = self.visit(node.value) + for n in node.targets: + self.handle_assign(n, val) + + def handle_assign(self, target, val): + if isinstance(target, ast.Tuple): + it = iter(val) + try: + for elt_idx, t in enumerate(target.elts): + if getattr(ast, "Starred", None) and isinstance(t, ast.Starred): + t = t.value + all_elts = list(it) + break_i = len(all_elts) - (len(target.elts) - elt_idx - 1) + self.store_val = all_elts[:break_i] + it = iter(all_elts[break_i:]) + else: + self.store_val = next(it) + self.visit(t) + except StopIteration: + raise ValueError( + "not enough values to unpack (expected {})".format(len(target.elts)) + ) + + try: + next(it) + raise ValueError( + "too many values to unpack (expected {})".format(len(target.elts)) + ) + except StopIteration: + # Expected + pass + else: + self.store_val = val + self.visit(target) + + def visit_Delete(self, node): + for n in node.targets: + self.visit(n) + + def visit_Pass(self, node): + pass + + def visit_Assert(self, node): + if node.msg is None: + assert self.visit(node.test) + else: + assert self.visit(node.test), self.visit(node.msg) + + def visit_Expr(self, node): + # Produced value is ignored + self.visit(node.value) + + def enumerate_comps(self, iters): + """Enumerate thru all possible values of comprehension clauses, + including multiple "for" clauses, each optionally associated + with multiple "if" clauses. Current result of the enumeration + is stored in the namespace.""" + + def eval_ifs(iter): + """Evaluate all "if" clauses.""" + for cond in iter.ifs: + if not self.visit(cond): + return False + return True + + if not iters: + yield + return + for el in self.visit(iters[0].iter): + self.store_val = el + self.visit(iters[0].target) + for t in self.enumerate_comps(iters[1:]): + if eval_ifs(iters[0]): + yield + + def visit_ListComp(self, node): + self.push_ns(FunctionNS(node)) + try: + return [self.visit(node.elt) for _ in self.enumerate_comps(node.generators)] + finally: + self.pop_ns() + + def visit_SetComp(self, node): + self.push_ns(FunctionNS(node)) + try: + return {self.visit(node.elt) for _ in self.enumerate_comps(node.generators)} + finally: + self.pop_ns() + + def visit_DictComp(self, node): + self.push_ns(FunctionNS(node)) + try: + return { + self.visit(node.key): self.visit(node.value) + for _ in self.enumerate_comps(node.generators) + } + finally: + self.pop_ns() + + def visit_IfExp(self, node): + if self.visit(node.test): + return self.visit(node.body) + else: + return self.visit(node.orelse) + + def visit_Call(self, node): + func = self.visit(node.func) + + args = [] + for a in node.args: + if getattr(ast, "Starred", None) and isinstance(a, ast.Starred): + args.extend(self.visit(a.value)) + else: + args.append(self.visit(a)) + + kwargs = {} + for kw in node.keywords: + val = self.visit(kw.value) + if kw.arg is None: + kwargs.update(val) + else: + kwargs[kw.arg] = val + + if func is builtins.super and not args: + if not self.ns.parent or not isinstance(self.ns.parent, ClassNS): + raise RuntimeError("super(): no arguments") + # As we're creating methods dynamically outside of class, super() without argument won't work, as that + # requires __class__ cell. Creating that would be cumbersome (Pycopy definitely lacks enough introspection + # for that), so we substitute 2 implied args (which argumentless super() would take from cell and 1st arg to + # func). In our case, we take them from prepared bookkeeping info. + args = (self.ns.parent.cls, self.ns["self"]) + + return func(*args, **kwargs) + + def visit_Compare(self, node): + cmpop_map = { + ast.Eq: lambda x, y: x == y, + ast.NotEq: lambda x, y: x != y, + ast.Lt: lambda x, y: x < y, + ast.LtE: lambda x, y: x <= y, + ast.Gt: lambda x, y: x > y, + ast.GtE: lambda x, y: x >= y, + ast.Is: lambda x, y: x is y, + ast.IsNot: lambda x, y: x is not y, + ast.In: lambda x, y: x in y, + ast.NotIn: lambda x, y: x not in y, + } + lv = self.visit(node.left) + for op, r in zip(node.ops, node.comparators): + rv = self.visit(r) + if not cmpop_map[type(op)](lv, rv): + return False + lv = rv + return True + + def visit_BoolOp(self, node): + if isinstance(node.op, ast.And): + res = True + for v in node.values: + res = res and self.visit(v) + elif isinstance(node.op, ast.Or): + res = False + for v in node.values: + res = res or self.visit(v) + else: + raise NotImplementedError + return res + + def visit_BinOp(self, node): + binop_map = { + ast.Add: lambda x, y: x + y, + ast.Sub: lambda x, y: x - y, + ast.Mult: lambda x, y: x * y, + ast.Div: lambda x, y: x / y, + ast.FloorDiv: lambda x, y: x // y, + ast.Mod: lambda x, y: x % y, + ast.Pow: lambda x, y: x ** y, + ast.LShift: lambda x, y: x << y, + ast.RShift: lambda x, y: x >> y, + ast.BitAnd: lambda x, y: x & y, + ast.BitOr: lambda x, y: x | y, + ast.BitXor: lambda x, y: x ^ y, + } + l = self.visit(node.left) + r = self.visit(node.right) + return binop_map[type(node.op)](l, r) + + def visit_UnaryOp(self, node): + unop_map = { + ast.UAdd: lambda x: +x, + ast.USub: lambda x: -x, + ast.Invert: lambda x: ~x, + ast.Not: lambda x: not x, + } + val = self.visit(node.operand) + return unop_map[type(node.op)](val) + + def visit_Subscript(self, node): + obj = self.visit(node.value) + idx = self.visit(node.slice) + if isinstance(node.ctx, ast.Load): + return obj[idx] + elif isinstance(node.ctx, ast.Store): + obj[idx] = self.store_val + elif isinstance(node.ctx, ast.Del): + del obj[idx] + else: + raise NotImplementedError + + def visit_Index(self, node): + return self.visit(node.value) + + def visit_Slice(self, node): + # Any of these can be None + lower = node.lower and self.visit(node.lower) + upper = node.upper and self.visit(node.upper) + step = node.step and self.visit(node.step) + slice = slice_getter[lower:upper:step] + return slice + + def visit_Attribute(self, node): + obj = self.visit(node.value) + if isinstance(node.ctx, ast.Load): + return getattr(obj, node.attr) + elif isinstance(node.ctx, ast.Store): + setattr(obj, node.attr, self.store_val) + elif isinstance(node.ctx, ast.Del): + delattr(obj, node.attr) + else: + raise NotImplementedError + + def visit_Global(self, node): + for n in node.names: + if n in self.ns and self.ns[n] is not GLOBAL: + raise SyntaxError( + "SyntaxError: name '{}' is assigned to before global declaration".format( + n + ) + ) + # Don't store GLOBAL in the top-level namespace + if self.ns.parent: + self.ns[n] = GLOBAL + + def visit_Nonlocal(self, node): + if isinstance(self.ns, ModuleNS): + raise SyntaxError("nonlocal declaration not allowed at module level") + for n in node.names: + self.ns[n] = NONLOCAL + + def resolve_nonlocal(self, id, ns): + while ns: + res = ns.get(id, NO_VAR) + if res is GLOBAL: + return self.module_ns + if res is not NO_VAR and res is not NONLOCAL: + if isinstance(ns, ModuleNS): + break + return ns + ns = ns.parent + raise SyntaxError("no binding for nonlocal '{}' found".format(id)) + + def visit_Name(self, node): + if isinstance(node.ctx, ast.Load): + res = NO_VAR + ns = self.ns + # We always lookup in the current namespace (on the first iteration), but afterwards we always skip class + # namespaces. Or put it another way, class code can look up in its own namespace, but that's the only case + # when the class namespace is consulted. + skip_classes = False + while ns: + if not (skip_classes and isinstance(ns, ClassNS)): + res = ns.get(node.id, NO_VAR) + if res is not NO_VAR: + break + ns = ns.parent + skip_classes = True + + if res is NONLOCAL: + ns = self.resolve_nonlocal(node.id, ns.parent) + return ns[node.id] + + if res is GLOBAL: + res = self.module_ns.get(node.id, NO_VAR) + + if res is not NO_VAR: + return res + + try: + return getattr(builtins, node.id) + except AttributeError: + raise NameError("name '{}' is not defined".format(node.id)) + + elif isinstance(node.ctx, ast.Store): + res = self.ns.get(node.id, NO_VAR) + if res is GLOBAL: + self.module_ns[node.id] = self.store_val + + elif res is NONLOCAL: + ns = self.resolve_nonlocal(node.id, self.ns.parent) + ns[node.id] = self.store_val + + else: + self.ns[node.id] = self.store_val + + elif isinstance(node.ctx, ast.Del): + res = self.ns.get(node.id, NO_VAR) + if res is NO_VAR: + raise NameError("name '{}' is not defined".format(node.id)) + + elif res is GLOBAL: + del self.module_ns[node.id] + + elif res is NONLOCAL: + ns = self.resolve_nonlocal(node.id, self.ns.parent) + del ns[node.id] + + else: + del self.ns[node.id] + + else: + raise NotImplementedError + + def visit_Dict(self, node): + return {self.visit(p[0]): self.visit(p[1]) for p in zip(node.keys, node.values)} + + def visit_Set(self, node): + return {self.visit(e) for e in node.elts} + + def visit_List(self, node): + return [self.visit(e) for e in node.elts] + + def visit_Tuple(self, node): + return tuple([self.visit(e) for e in node.elts]) + + def visit_NameConstant(self, node): + return node.value + + def visit_Ellipsis(self, node): + # In Py3k only + from ast import Ellipsis + + return Ellipsis + + def visit_Print(self, node): + # In Py2k only + raise NotImplementedError("Absolutely not. Use __future__.") + + def visit_Str(self, node): + return node.s + + def visit_Bytes(self, node): + return node.s + + def visit_Num(self, node): + return node.n + + +class InterpreterSystem(object): + """A bag of shared state.""" + + def __init__(self, path=None): + self.modules = {} + self.path = path or sys.path + + def handle_import(self, name, globals=None, locals=None, fromlist=(), level=0): + log.debug(" Attempting to import '{}'".format(name)) + if name not in self.modules: + if name in sys.modules: + log.debug(" Short-circuited from bootstrap sys.modules") + self.modules[name] = sys.modules[name] + + else: + name = name.replace(".", os.path.sep) + for e in self.path: + for ext in [ + # ".flow", + ".py", + ]: + if os.path.isdir(e): + f = os.path.join(e, name + ext) + log.debug(" Checking {}".format(f)) + if os.path.exists(f): + mod = self.load(f) + self.modules[name] = mod.ns + break + + elif os.path.isfile(e): + # FIXME (arrdem 2021-05-31) + raise RuntimeError( + "Import from .zip/.whl/.egg archives aren't supported yet" + ) + + else: + self.modules[name] = __import__( + name, globals, locals, fromlist, level + ) + + return self.modules[name] + + def load(self, fname): + with open(fname) as f: + tree = ast.parse(f.read()) + interp = ModuleInterpreter(self, fname, tree) + interp.visit(tree) + return interp + + def execute(self, fname): + with open(fname) as f: + tree = ast.parse(f.read()) + interp = ModuleInterpreter(self, fname, tree) + interp.ns["__name__"] = "__main__" + self.modules["__main__"] = InterpModule(interp.ns) + interp.visit(tree) + return interp + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + InterpreterSystem().execute(sys.argv[1]) diff --git a/scratch/test.py b/scratch/test.py new file mode 100644 index 0000000..567fe3d --- /dev/null +++ b/scratch/test.py @@ -0,0 +1,34 @@ +#!astinterp + +from __future__ import print_function + + +class Foo(object): + def bar(self): + return 1 + + @property + def baz(self): + print("'Computing' baz...") + return 2 + + +a = Foo() +print(a.bar()) +print(a.baz) + +import random + + +for _ in range(10): + print(random.randint(0, 1024)) + + +def bar(a, b, **bs): + pass + + +import requests + + +print(len(requests.get("https://pypi.org/pypi/requests/json").text)) diff --git a/setup.py b/setup.py deleted file mode 100644 index 9b54892..0000000 --- a/setup.py +++ /dev/null @@ -1,37 +0,0 @@ -from setuptools import setup - -setup( - name="arrdem.flowmetal", - # Package metadata - version='0.0.0', - license="MIT", - description="A weird execution engine", - long_description=open("README.md").read(), - long_description_content_type="text/markdown", - author="Reid 'arrdem' McKenzie", - author_email="me@arrdem.com", - url="https://git.arrdem.com/arrdem/flowmetal", - classifiers=[ - "License :: OSI Approved :: MIT License", - "Development Status :: 3 - Alpha", - "Intended Audience :: Developers", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", - ], - - # Package setup - package_dir={"": "src/python"}, - packages=[ - "flowmetal", - ], - entry_points = { - 'console_scripts': [ - 'iflow=flowmetal.repl:main' - ], - }, - install_requires=[ - 'prompt-toolkit~=3.0.0', - ], - extras_require={ - } -) diff --git a/src/flowmetal/com.twitter.wilson.flow b/src/flowmetal/com.twitter.wilson.flow deleted file mode 100644 index 9d61a52..0000000 --- a/src/flowmetal/com.twitter.wilson.flow +++ /dev/null @@ -1,83 +0,0 @@ -(defpackage com.twitter.wilson - (require - ;; The log lets you record status information into a program's trace - [flowmetal.log - :refer [log!]] - ;; The time system lets you put bounds on the implicit awaiting Flowmetal does - [flowmetal.time - :refer [with-timeout!, timeout?, make-duration, duration?, +seconds+, +hours+, sleep!]] - ;; JSON. Simple enough - [flowmetal.json - :refer [loads, dumps, json?]] - ;; Extensions! Provided by other systems. - ;; - ;; This one allows for an external service to receive HTTP callbacks on Flowmetal's behalf. - [http.callback - :refer [make-callback!, get-callback!, callback?]] - ;; This one allows for an external service to make HTTP requests on Flowmetal's behalf. - [http.request - :refer [post!, error?, dns-error?, connection-error?, response-error?]]) - - (defenum stage - +reboot+ - +bios-update+ - +reinstall+) - - ;; FIXME:- how to do table optimization? - (defn fib [x] - (match x - [0 1] - [1 1] - [_ (+ (fib (- x 1) (- x 2)))])) - - (defn retry-http [f - :- (fn? [] a?) - backoff-fn - :- (fn? [int?] duration?) - :default (fn [x :- int?] - :- duration? - (make-duration (fib x) +seconds+)) - backoff-count - :- int? - :default 0] - :- a - "The implementation of HTTP with retrying." - (let [response (f)] - (if (not (error? response)) - response - ;; FIXME:- how does auth denied get represented? - (if (or (dns-error? response) - (connection-error? response) - (response-error? response)) - (do (sleep (backoff-fn backoff-count)) - (retry-http* f backoff-fn (+ backoff-count 1))))))) - - (defn job [hostname - :- str? - stages - :- (list? stage?) - job-timeout - :- duration? - :default (duration 3 :hours)] - :- (union? [timeout? json?]) - "Run a wilson job, wait for the callback and process the result. - - By default the job is only waited for three hours. - - " - (let [callback :- callback? (make-callback!) - job (retry-http - (fn [] - (post "http://wilson.local.twitter.com" - :data - (dumps - {:host hostname - :stages [stages] - :callbacks [{:type :http, :url callback}]}))))] - - (let [result (with-timeout! (duration 3 :hours) - (fn [] - (get-callback callback)))] - (if-not (timeout? result) - (loads result) - result))))) diff --git a/src/flowmetal/flowmetal.time.flow b/src/flowmetal/flowmetal.time.flow deleted file mode 100644 index 4176cfb..0000000 --- a/src/flowmetal/flowmetal.time.flow +++ /dev/null @@ -1,33 +0,0 @@ -(defpackage flowmetal.time - (defenum time-unit - "Calendar-independent durations." - +milliseconds+ - +seconds+ - +hours+ - +days+) - - (defrecord duration [num :- int?, scale :- time-unit?] - "A type for representing scalar durations.") - - (defn as-milliseconds [d :- duration?] - :- duration? - "Normalize a duration to a number of milliseconds." - (match d - [(duration x +days+) (duration (* x 24) +hours+)] - [(duration x +hours+) (duration (* x 60) +minutes+)] - [(duration x +minutes+) (duration (* x 60) +seconds+)] - [(duration x +seconds+) (duration (* x 1000) +milliseconds+)] - [(duration x +milliseconds+) d])) - - ;; A type of one value used to represent an error - (defenum timeout - +timeout+) - - (defendpoint with-timeout! - [d :- duration? - f :- (fn? [] :- a)] - :- a - - ) - -) diff --git a/src/python/flowmetal/__init__.py b/src/python/flowmetal/__init__.py deleted file mode 100644 index e5a0d9b..0000000 --- a/src/python/flowmetal/__init__.py +++ /dev/null @@ -1 +0,0 @@ -#!/usr/bin/env python3 diff --git a/src/python/flowmetal/__main__.py b/src/python/flowmetal/__main__.py new file mode 100644 index 0000000..7fd4e13 --- /dev/null +++ b/src/python/flowmetal/__main__.py @@ -0,0 +1,26 @@ +""" +The Flowmetal server entry point. +""" + +import click +from flowmetal import ( + frontend, + interpreter, + reaper, + scheduler, +) + + +@click.group() +def cli(): + pass + + +cli.add_command(frontend.cli, name="frontend") +cli.add_command(interpreter.cli, name="interpreter") +cli.add_command(scheduler.cli, name="scheduler") +cli.add_command(reaper.cli, name="reaper") + + +if __name__ == "__main__": + cli() diff --git a/src/python/flowmetal/db/base.py b/src/python/flowmetal/db/base.py new file mode 100644 index 0000000..13e8023 --- /dev/null +++ b/src/python/flowmetal/db/base.py @@ -0,0 +1,28 @@ +""" +An abstract or base Flowmetal DB. +""" + +from abc import ( + abstractclassmethod, + abstractmethod, +) + + +class Db(ABC): + """An abstract Flowmetal DB.""" + + @abstractclassmethod + def connect(cls, config): + """Build and return a connected DB.""" + + @abstractmethod + def disconnect(self): + """Disconnect from the underlying DB.""" + + def close(self): + """An alias for disconnect allowing for it to quack as a closable.""" + self.disconnect() + + @abstractmethod + def reconnect(self): + """Attempt to reconnect; either after an error or disconnecting.""" diff --git a/src/python/flowmetal/db/redis.py b/src/python/flowmetal/db/redis.py new file mode 100644 index 0000000..7294778 --- /dev/null +++ b/src/python/flowmetal/db/redis.py @@ -0,0 +1,3 @@ +""" +An implementation of the Flowmetal DB backed by Redis. +""" diff --git a/src/python/flowmetal/frontend.py b/src/python/flowmetal/frontend.py new file mode 100644 index 0000000..82475bb --- /dev/null +++ b/src/python/flowmetal/frontend.py @@ -0,0 +1,9 @@ +""" +""" + +import click + + +@click.group() +def cli(): + pass diff --git a/src/python/flowmetal/interpreter.py b/src/python/flowmetal/interpreter.py new file mode 100644 index 0000000..82475bb --- /dev/null +++ b/src/python/flowmetal/interpreter.py @@ -0,0 +1,9 @@ +""" +""" + +import click + + +@click.group() +def cli(): + pass diff --git a/src/python/flowmetal/models.py b/src/python/flowmetal/models.py new file mode 100644 index 0000000..ebf7bee --- /dev/null +++ b/src/python/flowmetal/models.py @@ -0,0 +1,3 @@ +""" +Somewhat generic models of Flowmetal programs. +""" diff --git a/src/python/flowmetal/parser.py b/src/python/flowmetal/parser.py deleted file mode 100644 index bd0ff16..0000000 --- a/src/python/flowmetal/parser.py +++ /dev/null @@ -1,511 +0,0 @@ -""" -A parser for s-expressions. -""" - -from abc import ABC, abstractmethod -from enum import Enum -from io import StringIO, BufferedReader -from typing import IO, NamedTuple, Any -from fractions import Fraction -import re - - -## Types -class Position(NamedTuple): - """An encoding for the location of a read token within a source.""" - source: str - line: int - col: int - offset: int - - @staticmethod - def next_pos(pos: "Position"): - return Position(pos.source, pos.line, pos.col + 1, pos.offset + 1) - - @staticmethod - def next_line(pos: "Position"): - return Position(pos.source, pos.line + 1, 1, pos.offset + 1) - - -class TokenBase(object): - """The shared interface to tokens.""" - - @property - @abstractmethod - def pos(self): - """The position of the token within its source.""" - - @property - @abstractmethod - def raw(self): - """The raw token as scanned.""" - - -class ConstTokenBase(TokenBase, NamedTuple): - """The shared interface for constant tokens""" - data: Any - raw: str - pos: Position - - # Hash according to data - def __hash__(self): - return hash(self.data) - - # And make sure it's orderable - def __eq__(self, other): - return self.data == other - - def __lt__(self, other): - return self.data < other - - def __gt__(self, other): - return self.data > other - - -class BooleanToken(ConstTokenBase): - """A read boolean.""" - - -class IntegerToken(ConstTokenBase): - """A read integer, including position.""" - - -class FractionToken(ConstTokenBase): - """A read fraction, including position.""" - - -class FloatToken(ConstTokenBase): - """A read floating point number, including position.""" - - -class SymbolToken(ConstTokenBase): - """A read symbol, including position.""" - - -class KeywordToken(ConstTokenBase): - """A read keyword.""" - - -class StringToken(ConstTokenBase): - """A read string, including position.""" - - -class ListType(Enum): - """The supported types of lists.""" - ROUND = ("(", ")") - SQUARE = ("[", "]") - - -class ListToken(NamedTuple, TokenBase): - """A read list, including its start position and the paren type.""" - data: list - raw: str - pos: Position - paren: ListType = ListType.ROUND - - -class SetToken(NamedTuple, TokenBase): - """A read set, including its start position.""" - data: list - raw: str - pos: Position - - -class MappingToken(NamedTuple, TokenBase): - """A read mapping, including its start position.""" - data: list - raw: str - pos: Position - - -class WhitespaceToken(NamedTuple, TokenBase): - """A bunch of whitespace with no semantic value.""" - data: str - raw: str - pos: Position - - -class CommentToken(WhitespaceToken): - """A read comment with no semantic value.""" - - -## Parser implementation -class PosTrackingBufferedReader(object): - """A slight riff on BufferedReader which only allows for reads and peeks of a - char, and tracks positions. - - Perfect for implementing LL(1) parsers. - """ - - def __init__(self, f: IO, source_name=None): - self._next_pos = self._pos = Position(source_name, 1, 1, 0) - self._char = None - self._f = f - - def pos(self): - return self._pos - - def peek(self): - if self._char is None: - self._char = self._f.read(1) - return self._char - - def read(self): - # Accounting for lookahead(1) - ch = self._char or self._f.read(1) - self._char = self._f.read(1) - - # Accounting for the positions - self._pos = self._next_pos - if ch == "\r" and self.peek() == "\n": - super.read(1) # Throw out a character - self._next_pos = Position.next_line(self._next_pos) - elif ch == "\n": - self._next_pos = Position.next_line(self._next_pos) - else: - self._next_pos = Position.next_pos(self._next_pos) - - return ch - - -class ReadThroughBuffer(PosTrackingBufferedReader): - """A duck that quacks like a PosTrackingBufferedReader.""" - - def __init__(self, ptcr: PosTrackingBufferedReader): - self._reader = ptcr - self._buffer = StringIO() - - def pos(self): - return self._reader.pos() - - def peek(self): - return self._reader.peek() - - def read(self): - ch = self._reader.read() - self._buffer.write(ch) - return ch - - def __str__(self): - return self._buffer.getvalue() - - def __enter__(self, *args): - return self - - def __exit__(self, *args): - pass - - -class SexpParser(ABC): - @classmethod - @abstractmethod - def parse(cls, f: PosTrackingBufferedReader) -> TokenBase: - """Parse an s-expression, returning a parsed token tree.""" - - def read(cls, f: PosTrackingBufferedReader): - """Parse to a token tree and read to values returning the resulting values.""" - - return cls.parse(f).read() - - -class Parser(SexpParser): - """A basic parser which knows about lists, symbols and numbers. - - Intended as a base class / extension point for other parsers. - """ - - @classmethod - def parse(cls, f: PosTrackingBufferedReader): - if not f.peek(): - raise SyntaxError(f"Got end of file ({f.pos()}) while parsing") - elif cls.ispunct(f.peek()): - if f.peek() == "(": - return cls.parse_list(f) - elif f.peek() == "[": - return cls.parse_sqlist(f) - elif f.peek() == '"': - return cls.parse_str(f) - elif f.peek() == ";": - return cls.parse_comment(f) - else: - raise SyntaxError(f"Got unexpected punctuation {f.read()!r} at {f.pos()} while parsing") - elif cls.isspace(f.peek()): - return cls.parse_whitespace(f) - else: - return cls.parse_symbol(f) - - @classmethod - def isspace(cls, ch: str): - """An extension point allowing for a more expansive concept of whitespace.""" - return ch.isspace() or ch == ',' - - @classmethod - def ispunct(cls, ch: str): - return ch in ( - '"' - ';' # Semicolon - '()' # Parens - '⟮⟯' # 'flat' parens - '[]' # Square brackets - '⟦⟧' # 'white' square brackets - '{}' # Curly brackets - '⟨⟩' # Angle brackets - '《》' # Double angle brackets - '⟪⟫' # Another kind of double angle brackets - ) - - @classmethod - def parse_delimeted(cls, f: PosTrackingBufferedReader, openc, closec, ctor): - with ReadThroughBuffer(f) as rtb: - pos = None - for c in openc: - pos = pos or rtb.pos() - assert rtb.read() == c # Discard the leading delimeter - pos = rtb.pos() - acc = [] - while f.peek() != closec: - if not f.peek(): - raise SyntaxError(f"Got end of file while parsing {openc!r}...{closec!r} starting at {pos}") - try: - acc.append(cls.parse(rtb)) - except SyntaxError as e: - raise SyntaxError(f"While parsing {openc!r}...{closec!r} starting at {pos},\n{e}") - - assert rtb.read() == closec # Discard the trailing delimeter - return ctor(acc, str(rtb), pos) - - # FIXME (arrdem 2020-07-18): - # Break this apart and make the supported lists composable features somehow? - @classmethod - def parse_list(cls, f: PosTrackingBufferedReader): - return cls.parse_delimeted(f, "(", ")", lambda *args: ListToken(*args, ListType.ROUND)) - - @classmethod - def parse_sqlist(cls, f: PosTrackingBufferedReader): - return cls.parse_delimeted(f, "[", "]", lambda *args: ListToken(*args, ListType.SQUARE)) - - # FIXME (arrdem 2020-07-18): - # Break this apart into middleware or composable features somehow? - @classmethod - def handle_symbol(cls, buff, pos): - def _sign(m, idx): - if m.group(idx) == '-': - return -1 - else: - return 1 - - # Parsing integers with bases - if m := re.fullmatch(r"([+-]?)(\d+)r([a-z0-9_]+)", buff): - return IntegerToken( - _sign(m, 1) * int(m.group(3).replace("_", ""), - int(m.group(2))), - buff, - pos, - ) - - # Parsing hex numbers - if m := re.fullmatch(r"([+-]?)0[xX]([A-Fa-f0-9_]*)", buff): - val = m.group(2).replace("_", "") - return IntegerToken(_sign(m, 1) * int(val, 16), buff, pos) - - # Parsing octal numbers - if m := re.fullmatch(r"([+-]?)0([\d_]*)", buff): - val = m.group(2).replace("_", "") - return IntegerToken(_sign(m, 1) * int(val, 8), buff, pos) - - # Parsing integers - if m := re.fullmatch(r"([+-]?)\d[\d_]*", buff): - return IntegerToken(int(buff.replace("_", "")), buff, pos) - - # Parsing fractions - if m := re.fullmatch(r"([+-]?)(\d[\d_]*)/(\d[\d_]*)", buff): - return FractionToken( - Fraction( - int(m.group(2).replace("_", "")), - int(m.group(3).replace("_", ""))), - buff, - pos, - ) - - # Parsing floats - if re.fullmatch(r"([+-]?)\d[\d_]*(\.\d[\d_]*)?(e[+-]?\d[\d_]*)?", buff): - return FloatToken(float(buff), buff, pos) - - # Booleans - if buff == "true": - return BooleanToken(True, buff, pos) - - if buff == "false": - return BooleanToken(False, buff, pos) - - # Keywords - if buff.startswith(":"): - return KeywordToken(buff, buff, pos) - - # Default behavior - return SymbolToken(buff, buff, pos) - - @classmethod - def parse_symbol(cls, f: PosTrackingBufferedReader): - with ReadThroughBuffer(f) as rtb: - pos = None - while rtb.peek() and not cls.isspace(rtb.peek()) and not cls.ispunct(rtb.peek()): - pos = pos or rtb.pos() - rtb.read() - buff = str(rtb) - return cls.handle_symbol(buff, pos) - - @classmethod - def parse_whitespace(cls, f: PosTrackingBufferedReader): - with ReadThroughBuffer(f) as rtb: - pos = None - while rtb.peek() and cls.isspace(rtb.peek()): - pos = pos or rtb.pos() - ch = rtb.read() - if ch == "\n": - break - buff = str(rtb) - return WhitespaceToken(buff, buff, pos) - - @classmethod - def parse_comment(cls, f: PosTrackingBufferedReader): - with ReadThroughBuffer(f) as rtb: - pos = None - while rtb.read() not in ["\n", ""]: - pos = pos or rtb.pos() - continue - buff = str(rtb) - return CommentToken(buff, buff, pos) - - - @classmethod - def handle_escape(cls, ch: str): - if ch == 'n': - return "\n" - elif ch == 'r': - return "\r" - elif ch == 'l': - return "\014" # form feed - elif ch == 't': - return "\t" - elif ch == '"': - return '"' - - @classmethod - def parse_str(cls, f: PosTrackingBufferedReader): - with ReadThroughBuffer(f) as rtb: - assert rtb.read() == '"' - pos = rtb.pos() - content = [] - - while True: - if not rtb.peek(): - raise - - # Handle end of string - elif rtb.peek() == '"': - rtb.read() - break - - # Handle escape sequences - elif rtb.peek() == '\\': - rtb.read() # Discard the escape leader - # Octal escape - if rtb.peek() == '0': - rtb.read() - buff = [] - while rtb.peek() in '01234567': - buff.append(rtb.read()) - content.append(chr(int(''.join(buff), 8))) - - # Hex escape - elif rtb.peek() == 'x': - rtb.read() # Discard the escape leader - buff = [] - while rtb.peek() in '0123456789abcdefABCDEF': - buff.append(rtb.read()) - content.append(chr(int(''.join(buff), 16))) - - else: - content.append(cls.handle_escape(rtb.read())) - - else: - content.append(rtb.read()) - - buff = str(rtb) - return StringToken(content, buff, pos) - - -## Parsing interface -def parses(buff: str, - parser: SexpParser = Parser, - source_name=None): - """Parse a single s-expression from a string, returning its token tree.""" - - return parse(StringIO(buff), parser, source_name or f"") - - -def parsef(path: str, - parser: SexpParser = Parser): - """Parse a single s-expression from the file named by a string, returning its token tree.""" - - with open(path, "r") as f: - return parse(f, parser, path) - - -def parse(file: IO, - parser: SexpParser = Parser, - source_name=None): - """Parse a single sexpression from a file-like object, returning its token tree.""" - - return parser.parse( - PosTrackingBufferedReader( - file, - source_name=source_name - ) - ) - - -## Loading interface -def loads(buff: str, - parser: SexpParser = Parser, - source_name=None): - """Load a single s-expression from a string, returning its object representation.""" - - return load(StringIO(buff), parser, source_name or f"") - - -def loadf(path: str, - parser: SexpParser = Parser): - """Load a single s-expression from the file named by a string, returning its object representation.""" - - with open(path, "r") as f: - return load(f, parser, path) - - -def load(file: IO, - parser: SexpParser = Parser, - source_name=None): - """Load a single sexpression from a file-like object, returning its object representation.""" - - return parser.load( - PosTrackingBufferedReader( - file, - source_name=source_name - ) - ) - - -## Dumping interface -def dump(file: IO, obj): - """Given an object, dump its s-expression coding to the given file-like object.""" - - raise NotImplementedError() - - -def dumps(obj): - """Given an object, dump its s-expression coding to a string and return that string.""" - - with StringIO("") as f: - dump(f, obj) - return str(f) diff --git a/src/python/flowmetal/reaper.py b/src/python/flowmetal/reaper.py new file mode 100644 index 0000000..82475bb --- /dev/null +++ b/src/python/flowmetal/reaper.py @@ -0,0 +1,9 @@ +""" +""" + +import click + + +@click.group() +def cli(): + pass diff --git a/src/python/flowmetal/repl.py b/src/python/flowmetal/repl.py deleted file mode 100644 index 149b621..0000000 --- a/src/python/flowmetal/repl.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import sys - -from flowmetal.syntax_analyzer import analyzes - -from prompt_toolkit import print_formatted_text, prompt, PromptSession -from prompt_toolkit.formatted_text import FormattedText -from prompt_toolkit.history import FileHistory -from prompt_toolkit.styles import Style - - -STYLE = Style.from_dict({ - # User input (default text). - "": "", - "prompt": "ansigreen", - "time": "ansiyellow" -}) - - -class InterpreterInterrupt(Exception): - """An exception used to break the prompt or evaluation.""" - - -def pp(t, indent=""): - if isinstance(t, list): # lists - buff = ["["] - for e in t: - buff.append(f"{indent} " + pp(e, indent+" ")+",") - return "\n".join(buff + [f"{indent}]"]) - - elif hasattr(t, '_fields'): # namedtuples - buff = [f"{type(t).__name__}("] - for field, value in zip(t._fields, t): - buff.append(f"{indent} {field}=" + pp(value, indent+" ")+",") - return "\n".join(buff + [f"{indent})"]) - - elif isinstance(t, tuple): # tuples - buff = ["("] - for e in t: - buff.append(f"{indent} " + pp(e, indent+" ")+",") - return "\n".join(buff + [f"{indent})"]) - - else: - return repr(t) - -parser = argparse.ArgumentParser() - -def main(): - """REPL entry point.""" - - args = parser.parse_args(sys.argv[1:]) - logger = logging.getLogger("flowmetal") - ch = logging.StreamHandler() - ch.setLevel(logging.INFO) - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - ch.setFormatter(formatter) - logger.addHandler(ch) - - session = PromptSession(history=FileHistory(".iflow.history")) - line_no = 0 - - while True: - try: - line = session.prompt([("class:prompt", ">>> ")], style=STYLE) - except (InterpreterInterrupt, KeyboardInterrupt): - continue - except EOFError: - break - - try: - print(pp(analyzes(line, source_name=f"repl@{line_no}"))) - except Exception as e: - print(e) - finally: - line_no += 1 diff --git a/src/python/flowmetal/scheduler.py b/src/python/flowmetal/scheduler.py new file mode 100644 index 0000000..82475bb --- /dev/null +++ b/src/python/flowmetal/scheduler.py @@ -0,0 +1,9 @@ +""" +""" + +import click + + +@click.group() +def cli(): + pass diff --git a/src/python/flowmetal/syntax_analyzer.py b/src/python/flowmetal/syntax_analyzer.py deleted file mode 100644 index 6f38510..0000000 --- a/src/python/flowmetal/syntax_analyzer.py +++ /dev/null @@ -1,356 +0,0 @@ -""" -The parser just parses and tokenizes. - -The [syntax] analyzer interprets a parse sequence into a syntax tree which can be checked, type inferred and compiled. -""" - -from abc import ABC, abstractmethod -from io import StringIO -from typing import NamedTuple, List, Union, Any, IO, Tuple -from enum import Enum - -import flowmetal.parser as p - - -### Types -## We are not, in fact, sponsored by Typelevel LLC. -class TypeLevelExpr(object): - """A base class for type-level expressions.""" - pass - - -class GenericExpr(TypeLevelExpr, NamedTuple): - """'invocation' (application) of a generic type to Type[Level]Exprs.""" - pass - - -class TypeExpr(TypeLevelExpr, NamedTuple): - """A bound (or yet to be bound) type level symbol.""" - pass - - -class BuiltinType(TypeLevelExpr, Enum): - """Built in types for atoms.""" - BOOLEAN = 'Boolean' - SYMBOL = 'Symbol' - KEYWORD = 'Keyword' - STRING = 'String' - INTEGER = 'Integer' - FRACTION = 'Fraction' - FLOAT = 'Float' - - -class ConstraintExpr(TypeLevelExpr, NamedTuple): - """A value-level constraint (predicate) as a type.""" - - -## Terms -# Now down to reality -class ValueLevelExpr(object): - """A base class for value-level expressions.""" - - -class TriviallyTypedExpr(ValueLevelExpr): - """And some of those expressions have trivial types.""" - @property - def type(self) -> TypeExpr: - """The type of an expression.""" - - -class AscribeExpr(TriviallyTypedExpr, NamedTuple): - value: ValueLevelExpr - type: TypeLevelExpr - - -class ConstExpr(TriviallyTypedExpr, NamedTuple): - """Constant expressions. Keywords, strings, numbers, that sort of thing.""" - - token: p.ConstTokenBase - - @property - def data(self) -> Any: - """The value of the constant.""" - # The parser gives us this data - return self.token.data - - @abstractmethod - def type(self): - raise NotImplementedError() - - -class BooleanExpr(ConstExpr): - @property - def type(self): - return BuiltinType.BOOLEAN - - -class IntegerExpr(ConstExpr): - @property - def type(self): - return BuiltinType.INTEGER - - -class FractionExpr(ConstExpr): - @property - def type(self): - return BuiltinType.FRACTION - - -class FloatExpr(ConstExpr): - @property - def type(self): - return BuiltinType.FLOAT - - -class KeywordExpr(ConstExpr): - @property - def type(self): - return BuiltinType.KEYWORD - - -class StringExpr(ConstExpr): - @property - def type(self): - return BuiltinType.STRING - - -class ListExpr(ValueLevelExpr, NamedTuple): - elements: List[ValueLevelExpr] - - -## 'real' AST nodes -class DoExpr(ValueLevelExpr, NamedTuple): - effect_exprs: List[ValueLevelExpr] - ret_expr: ValueLevelExpr - - -class LetExpr(ValueLevelExpr, NamedTuple): - binding_exprs: List[Tuple] - ret_expr: DoExpr - - -class FnExpr(ValueLevelExpr, NamedTuple): - arguments: List - ret_type: TypeExpr - ret_expr: DoExpr - - -## Reader implementation -class AnalyzerBase(ABC): - """Analyzer interface.""" - - @classmethod - @abstractmethod - def analyze(cls, token: p.TokenBase) -> ValueLevelExpr: - """Analyze a token tree, returning an expr tree.""" - - -def _t(txt): - return p.SymbolToken(txt, txt, None) - - -class Analyzer(AnalyzerBase): - """A reference Analyzer implementation. - - Walks a parsed token tree, building up a syntax tree. - """ - TACK0 = _t('⊢') - TACK1 = _t('|-') - TACK2 = p.KeywordToken(":-", None, None) - LET = _t('let') - DO = _t('do') - FN = _t('fn') - LIST = _t('list') - QUOTE = _t('quote') - - @classmethod - def _tackp(cls, t): - return t in [cls.TACK0, cls.TACK1, cls.TACK2] - - @classmethod - def _nows(cls, tokens): - return [t for t in tokens if not isinstance(t, p.WhitespaceToken)] - - @classmethod - def _chomp(cls, tokens): - """'chomp' an expression and optional ascription off the tokens, returning an expression and the remaining tokens.""" - - if len(tokens) == 1: - return cls.analyze(tokens[0]), [] - elif cls._tackp(tokens[1]): - if len(tokens) >= 3: - return ( - AscribeExpr( - cls.analyze(tokens[0]), - cls.analyze(tokens[2])), - tokens[3:], - ) - else: - raise SyntaxError(f"Analyzing tack at {tokens[1].pos}, did not find following type ascription!") - else: - return cls.analyze(tokens[0]), tokens[1::] - - @classmethod - def _terms(cls, tokens): - terms = [] - tokens = cls._nows(tokens) - while tokens: - term, tokens = cls._chomp(tokens) - terms.append(term) - return terms - - @classmethod - def analyze(cls, token: p.TokenBase): - if isinstance(token, p.BooleanToken): - return BooleanExpr(token) - - if isinstance(token, p.KeywordToken): - return KeywordExpr(token) - - if isinstance(token, p.IntegerToken): - return IntegerExpr(token) - - if isinstance(token, p.FractionToken): - return FractionExpr(token) - - if isinstance(token, p.FloatToken): - return FloatExpr(token) - - if isinstance(token, p.StringToken): - return StringExpr(token) - - if isinstance(token, p.SymbolToken): - return token - - if isinstance(token, p.ListToken): - return cls.analyze_list(token) - - @classmethod - def _do(cls, t, body: list): - return p.ListToken([cls.DO] + body, t.raw, t.pos) - - @classmethod - def analyze_list(cls, token: p.ListToken): - """Analyze a list, for which there are several 'ground' forms.""" - - # Expunge any whitespace tokens - tokens = cls._nows(token.data) - - if len(tokens) == 0: - return ListExpr([]) - - if tokens[0] == cls.QUOTE: - raise NotImplementedError("Quote isn't quite there!") - - if tokens[0] == cls.LIST: - return ListExpr(cls._terms(tokens[1:])) - - if tokens[0] == cls.DO: - return cls.analyze_do(token) - - if tokens[0] == cls.LET: - return cls.analyze_let(token) - - if tokens[0] == cls.FN: - return cls.analyze_fn(token) - - cls.analyze_invoke(tokens) - - @classmethod - def analyze_let(cls, let_token): - tokens = cls._nows(let_token.data[1:]) - assert len(tokens) >= 2 - assert isinstance(tokens[0], p.ListToken) - bindings = [] - binding_tokens = cls._nows(tokens[0].data) - tokens = tokens[1:] - while binding_tokens: - if isinstance(binding_tokens[0], p.SymbolToken): - bindexpr = binding_tokens[0] - binding_tokens = binding_tokens[1:] - else: - raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got illegal binding expression {binding_tokens[0]}") - - if not binding_tokens: - raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got binding expression without subsequent value expression!") - - if cls._tackp(binding_tokens[0]): - if len(binding_tokens) < 2: - raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got `⊢` at {binding_tokens[0].pos} without type!") - bind_ascription = cls.analyze(binding_tokens[1]) - binding_tokens = binding_tokens[2:] - bindexpr = AscribeExpr(bindexpr, bind_ascription) - - if not binding_tokens: - raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got binding expression without subsequent value expression!") - - valexpr = binding_tokens[0] - binding_tokens = cls.analyze(binding_tokens[1:]) - - bindings.append((bindexpr, valexpr)) - - # FIXME (arrdem 2020-07-18): - # This needs to happen with bindings - tail = tokens[0] if len(tokens) == 1 else cls._do(let_token, tokens) - return LetExpr(bindings, cls.analyze(tail)) - - @classmethod - def analyze_do(cls, do_token): - tokens = cls._nows(do_token.data[1:]) - exprs = cls._terms(tokens) - if exprs[:-1]: - return DoExpr(exprs[:-1], exprs[-1]) - else: - return exprs[-1] - - @classmethod - def analyze_fn(cls, fn_token): - tokens = cls._nows(fn_token.data[1:]) - assert len(tokens) >= 2 - assert isinstance(tokens[0], p.ListToken) - - args = [] - arg_tokens = cls._nows(tokens[0].data) - while arg_tokens: - argexpr, arg_tokens = cls._chomp(arg_tokens) - args.append(argexpr) - - ascription = None - if cls._tackp(tokens[1]): - ascription = cls.analyze(tokens[2]) - tokens = tokens[2:] - else: - tokens = tokens[1:] - - # FIXME (arrdem 2020-07-18): - # This needs to happen with bindings - body = cls.analyze(cls._do(fn_token, tokens)) - return FnExpr(args, ascription, body) - - -## Analysis interface -def analyzes(buff: str, - analyzer: AnalyzerBase = Analyzer, - parser: p.SexpParser = p.Parser, - source_name = None): - """Parse a single s-expression from a string, returning its token tree.""" - - return analyze(StringIO(buff), analyzer, parser, source_name or f"") - - -def analyzef(path: str, - analyzer: AnalyzerBase = Analyzer, - parser: p.SexpParser = p.Parser): - """Parse a single s-expression from the file named by a string, returning its token tree.""" - - with open(path, "r") as f: - return analyze(f, analyzer, parser, path) - - -def analyze(file: IO, - analyzer: AnalyzerBase = Analyzer, - parser: p.SexpParser = p.Parser, - source_name = None): - """Parse a single sexpression from a file-like object, returning its token tree.""" - - return analyzer.analyze(p.parse(file, parser, source_name)) diff --git a/test/python/flowmetal/test_parser.py b/test/python/flowmetal/test_parser.py deleted file mode 100644 index b78caf4..0000000 --- a/test/python/flowmetal/test_parser.py +++ /dev/null @@ -1,161 +0,0 @@ -""" -Tests covering the Flowmetal parser. -""" - -from math import nan - -import flowmetal.parser as p - -import pytest - - -def test_parse_list(): - """Trivial parsing a list.""" - assert isinstance(p.parses("()"), p.ListToken) - assert p.parses("()").paren == p.ListType.ROUND - - -@pytest.mark.parametrize('txt, val', [ - ('1', 1), - ('2', 2), - ('103', 103), - ('504', 504), - # Sign prefixes - ('-1', -1), - ('+1', +1), - # Underscores as whitespace - ('1_000_000', 1e6), - ('+1_000', 1000), - ('-1_000', -1000), - # Variable base - ('2r1', 1), - ('2r10', 2), - ('2r100', 4), - ('2r101', 5), - ('+2r10', 2), - ('-2r10', -2), - # Octal - ('00', 0), - ('01', 1), - ('010', 8), - ('+010', 8), - ('-010', -8), - # Hex - ('0x0', 0), - ('0xF', 15), - ('0x10', 16), - ('+0x10', 16), - ('-0x10', -16), -]) -def test_parse_num(txt, val): - """Some trivial cases of parsing numbers.""" - assert isinstance(p.parses(txt), p.IntegerToken) - assert p.parses(txt).data == val - - -@pytest.mark.parametrize('frac', [ - '1/2', '1/4', '1/512', -]) -def test_parse_ratio(frac): - """Test covering the ratio notation.""" - assert isinstance(p.parses(frac), p.FractionToken) - assert p.parses(frac).data == p.Fraction(frac) - - - -@pytest.mark.parametrize('sym,', [ - 'a', - 'b', - '*earmuff-style*', - '+kebab-style+', - 'JAVA_CONSTANT_STYLE', -]) -def test_parse_sym(sym): - """Some trivial cases of parsing symbols.""" - assert isinstance(p.parses(sym), p.SymbolToken) - assert p.parses(sym).data == sym - - -@pytest.mark.parametrize('txt, tokenization', [ - ('(1 2 3)', - [(p.IntegerToken, '1'), - (p.WhitespaceToken, ' '), - (p.IntegerToken, '2'), - (p.WhitespaceToken, ' '), - (p.IntegerToken, '3')]), - ('(a 1 b 2)', - [(p.SymbolToken, 'a'), - (p.WhitespaceToken, ' '), - (p.IntegerToken, '1'), - (p.WhitespaceToken, ' '), - (p.SymbolToken, 'b'), - (p.WhitespaceToken, ' '), - (p.IntegerToken, '2')]) -]) -def test_list_contents(txt, tokenization): - """Parse examples of list contents.""" - assert isinstance(p.parses(txt), p.ListToken) - - lelems = p.parses(txt).data - for (type, text), token in zip(tokenization, lelems): - assert isinstance(token, type) - assert token.raw == text - - -@pytest.mark.parametrize('txt, value', [ - ('1.0', 1.0), - ('-1.0', -1.0), - ('1.01', 1.01), - ('1e0', 1e0), - ('1e3', 1e3), - ('1e-3', 1e-3), - ('1.01e3', 1.01e3), - ('1_000e0', 1e3), -]) -def test_float_values(txt, value): - """Some examples of floats.""" - assert isinstance(p.parses(txt), p.FloatToken) - assert p.parses(txt).data == value - - -@pytest.mark.parametrize('txt, tokenization', [ - ('+1', p.IntegerToken), - ('+1+', p.SymbolToken), - ('+1e', p.SymbolToken), - ('+1e3', p.FloatToken), - ('+1.0', p.FloatToken), - ('+1.0e3', p.FloatToken), - ('a.b', p.SymbolToken), - ('1.b', p.SymbolToken), -]) -def test_ambiguous_floats(txt, tokenization): - """Parse examples of 'difficult' floats and symbols.""" - assert isinstance(p.parses(txt), tokenization), "Token type didn't match!" - assert p.parses(txt).raw == txt, "Parse wasn't total!" - - -@pytest.mark.parametrize('txt,', [ - r'""', - r'"foo"', - r'"foo bar baz qux"', - r'"foo\nbar\tbaz\lqux"', - r'''"foo - bar - baz - qux"''', - r'"\000 \x00"', - r'"\"\""', -]) -def test_string(txt): - """Some examples of strings, and of escape sequences.""" - assert isinstance(p.parses(txt), p.StringToken) - - -@pytest.mark.parametrize('txt,', [ - ':foo', - ':foo/bar', - ':foo.bar/baz?', -]) -def test_keyword(txt): - """Some examples of keywords.""" - assert isinstance(p.parses(txt), p.KeywordToken) diff --git a/test/python/flowmetal/test_syntax_analyzer.py b/test/python/flowmetal/test_syntax_analyzer.py deleted file mode 100644 index 7afde5d..0000000 --- a/test/python/flowmetal/test_syntax_analyzer.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -Tests covering the Flowmetal analyzer. -""" - -import flowmetal.parser as p -import flowmetal.syntax_analyzer as a - -import pytest - - -@pytest.mark.parametrize('txt, exprtype', [ - # Booleans - ('true', a.ConstExpr), - ('false', a.BooleanExpr), - # Integers - ('1', a.ConstExpr), - ('1', a.IntegerExpr), - # Fractions - ('1/2', a.ConstExpr), - ('1/2', a.FractionExpr), - # Floats - ('1.0', a.ConstExpr), - ('1.0', a.FloatExpr), - # Keywords - (':foo', a.ConstExpr), - (':foo', a.KeywordExpr), - # Strings - ('"foo"', a.ConstExpr), - ('"foo"', a.StringExpr), -]) -def test_analyze_constants(txt, exprtype): - """Make sure the analyzer can chew on constants.""" - assert isinstance(a.analyzes(txt), exprtype) - - -@pytest.mark.parametrize('txt', [ - '()', - '(list)', - '(list 1)', - '(do 1)', - '(do foo bar 1)', - '(let [a 1, b 2] 1)', - '(fn [] 1)', - '(fn [] ⊢ integer? x)', - '(fn [] x |- integer?)', - '(fn [] x :- integer?)', -]) -def test_analyze(txt): - """Make sure that do exprs work.""" - assert a.analyzes(txt)