Refreshing

This commit is contained in:
Reid 'arrdem' McKenzie 2023-03-07 18:55:25 -07:00
parent 2b0d863531
commit 87539621e5
27 changed files with 1774 additions and 1368 deletions

View file

@ -1,11 +1,11 @@
# Flowmetal
> A shining mercurial metal laden with sensors and almost infinitely reconfigurable.
>
> A shining mercurial metal, laden with sensors and almost infinitely reconfigurable.
> The stuff of which robots and servitors are made.
Flowmetal is a substrate for automation.
It attempts to provide a programming environment wherein programs are durable, evented and asynchronous aimed at what would traditionally be described as scripting or coordination.
It provides a programming environment wherein programs are durable, evented and asynchronous by default.
It is aimed at scripting or coordination tasks, such as workflows and scheduled jobs.
Let's unpack these terms.
@ -21,65 +21,16 @@ This also allows for external systems such as REST callback APIs, databases and
It also allows bidirectional communication between Flowmetal programs and other more traditional programming environments.
Anything that can communicate with Flowmetal can provide function implementations, or call Flowmetal programs!
This centering of evented communication makes Flowmetal ideal for **coordination** tasks, from simple task sequencing to map/reduce and other parallelism patterns.
**Asynchronous** - thanks to Flowmetal's evented execution model, waiting for slow external events either synchronously or asynchronously is second nature!
Flowmetal is especially good at waiting for very, very slow external operations.
Stuff like webhooks and batch processes.
**Scripting** - the tradeoff Flowmetal makes for the evented model is that it's slow.
While Flowmetal foreign functions could be fast, Flowmetal's interpreter isn't designed for speed.
It's designed for eventing and ensuring durability.
This makes Flowmetal suitable for interacting with and coordinating other systems, but it's not gonna win any benchmark games.
**Scripting** - Durablity and distribution of execution come at coordination costs which make Flowmetal well suited for coordination tasks, but not for heavy processing.
## Wait what?
Okay.
In simpler words, Flowmetal is an interpreted lisp which can use a datastore of your choice for durability.
Other systems can attach to Flowmetal's datastore and send events to and receive them from Flowmetal.
For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector.
A possible Flowmetal setup looks something like this -
```
+----------------------------+
+---------------------------+ |
+--------------------------+ |--+
| External HTTP service(s) |--+
+--------------------------+
^ ^
| |
v v
+-----------------------+ +------------------------+
| HTTP server connector | | HTTP request connector |
+-----------------------+ +------------------------+
^ ^
| |
v v
+--------------------+
| Shared event store |
+--------------------+
^
|
v
+--------------------------+
| Flowmetal interpreter(s) |
+--------------------------+
```
In this setup, the Flowmetal interpreters are able to interact with an external HTTP service; sending and receiving webhooks with Flowmetal programs waiting for those external events to arrive.
For instance this program would use the external connector stubs to build up interaction(s) with an external system.
```lisp
```
Comparisons to Apache Airflow are at least in this setup pretty apt, although Flowmetal's durable execution model makes it much more suitable for providing reliable workflows and its DSL is more approachable.
- For a problem statement, see [Call/CC Airflow](docs/call_cc_airflow.md).
- For an architecture overview, see [Architecture](docs/architecture.md).
- For example doodles, see [examples](examples).
## License
Mirrored from https://git.arrdem.com/arrdem/flowmetal
Published under the MIT license. See [LICENSE.md](LICENSE.md)

251
doc/NOTES.md Normal file
View file

@ -0,0 +1,251 @@
# Notes
https://github.com/Pyrlang/Pyrlang
https://en.wikipedia.org/wiki/Single_system_image
## Example - Await
A common pattern working in distributed environments is to want to request another system perform a job and wait for its results.
There are lots of parallels here to making a function or RPC call, except that it's a distributed system with complex failure modes.
In a perfect world we'd want to just write something like this -
```python
#!/usr/bin/env python3.10
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
job = client.create_job(...)
result = await job
# Do something with the result
```
There's some room for variance here around API design taste, but this snippet is probably familiar to many Python readers.
Let's think about its failure modes.
First, that `await` is doing a lot of heavy lifting.
Presumably it's wrapping up a polling loop of some sort.
That may be acceptable in some circumstances, but it really leaves to the client library implementer the question of what an acceptable retry policy is.
Second, this snippet assumes that `create_job` will succeed.
There won't be an authorization error, or a network transit error, or a remote server error or anything like that.
Third, there's no other record of whatever `job` is.
If the Python interpreter running this program dies, or the user gets bored and `C-c`'s it or the computer encounters a problem, the job will be lost.
Maybe that's OK, maybe it isn't.
But it's a risk.
Now, let's think about taking on some of the complexity needed to solve these problems ourselves.
### Retrying challenges
We can manually write the retry loop polling a remote API.
``` python
#!/usr/bin/env python3.10
from datetime import datetime, timedelta
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
AWAIT_TIMEOUT = timedelta(minutes=30)
POLL_TIME = timedelta(seconds=10)
def sleep(duration=POLL_TIME):
"""A slightly more useful sleep. Has our default and does coercion."""
from time import sleep
if isinstance(duration, timedelta):
duration = duration.total_seconds()
sleep(duration)
# Create a job, assuming idempotence
while True:
try:
job = client.create_job(...)
start_time = datetime.now()
break
except:
sleep()
# Waiting for the job
while True:
# Time-based timeout
if datetime.now() - start_time > AWAIT_TIMEOUT:
raise TimeoutError
# Checking the job status, no backoff linear polling
try:
if not job.complete():
continue
except:
sleep()
continue
# Trying to read the job result, re-using the retry loop & total timeout machinery
try:
result = job.get()
break
except:
sleep()
continue
# Do something with the result
```
We could pull [retrying](https://pypi.org/project/retrying/) off the shelf and get some real mileage here.
`retrying` is a super handy little library that provides the `@retry` decorator, which implements a variety of common retrying concerns such as retrying N times with linear or exponential back-off, and such.
It's really just the `while/try/except` state machine we just wrote a couple times as a decorator.
``` python
#!/usr/bin/env python3.10
from datetime import datetime, timedelta
from retrying import retry
from service.client import Client
CLIENT = Client("http://service.local", api_key="...")
AWAIT_TIMEOUT = timedelta(minutes=30)
POLL_TIME = timedelta(seconds=10)
class StillWaitingException(Exception):
"""Something we can throw to signal we're still waiting on an external event."""
@retry(wait_fixed=POLL_TIME.total_milliseconds())
def r_create_job(client):
"""R[eliable] create job. Retries over exceptions forever with a delay. No jitter."""
return client.create_job()
@retry(stop_max_delay=AWAIT_TIMEOUT.total_milliseconds(),
wait_fixed=POLL_TIME.total_milliseconds())
def r_get_job(job):
"""R[eliable] get job. Retries over exceptions up to a total time with a delay. No jitter."""
if not job.complete():
raise StillWaitingException
return job.get()
job = r_create_job(client)
result = r_get_job(job)
# Do something with the result
```
That's pretty good!
We've preserved most of our direct control over the mechanical retrying behavior, we can tweak it or choose a different provider.
And we've managed to get the syntactic density of the original `await` example back ... almost.
This is where Python's lack of an anonymous function block syntax and other lexical structures becomes a sharp limiter.
In another language like Javascript or LUA, you could probably get this down to something like -
``` lua
-- retry is a function of retrying options to a function of a callable to retry
-- which returns a zero-argument callable which will execute the callable with
-- the retrying behavior as specified.
client = Client("http://service.local", api_key="...")
retry_config = {} -- Fake, obviously
with_retry = retry(retry_config)
job = with_retry(
funtion ()
return client.start_plan(...)
end)()
result = with_retry(
function()
if job.complete() then
return job.get()
end
end)()
```
The insight here is that the "callback" function we're defining in the Python example as `r_get_job` and so forth has no intrinsic need to be named.
In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader.
Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes.
Using [PEP-0342](https://www.python.org/dev/peps/pep-0342/#new-generator-method-send-value), it's possible to implement arbitrary coroutines in Python by `.send()`ing values to generators which may treat `yield` statements as rvalues for receiving remotely sent inputs.
This makes it possible to explicitly yield control to a remote interpreter, which will return or resume the couroutine with a result value.
Microsoft's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=python) use exactly this behavior to implement durable functions.
The "functions" provided by the API return sentinels which can be yielded to an external interpreter, which triggers processing and returns control when there are results.
This is [interpreter effect conversion pattern (Extensible Effects)](http://okmij.org/ftp/Haskell/extensible/exteff.pdf) as seen in Haskell and other tools; applied.
``` python
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
```
Now it would seem that you could "just" automate doing rewriting that to something like this -
``` python
@df.Durable
def main(ctx):
x = context.call_activity("F1", None)
y = context.call_activity("F2", x)
z = context.call_activity("F3", y)
return context.call_activity("F4", z)
```
There's some prior art for doing this (https://eigenfoo.xyz/manipulating-python-asts/, https://greentreesnakes.readthedocs.io/en/latest/manipulating.html#modifying-the-tree) but it's a lot of legwork for not much.
There are also some pretty gaping correctness holes in taking the decorator based rewriting approach;
how do you deal with rewriting imported code, or code that's in classes/behind `@property` and other such tricks?
Just not worth it.
Now, what we _can_ do is try to hijack the entire Python interpreter to implement the properties/tracing/history recording we want there.
The default cpython lacks hooks for doing this, but we can write a python-in-python interpreter and "lift" the user's program into an interpreter we control, which ultimately gets most of its behavior "for free" from the underlying cpython interpreter.
There's [an example](https://github.com/pfalcon/pyastinterp) of doing this as part of the pycopy project; although there it's more of a Scheme-style proof of metacircular self-hosting.
There's a modified copy of the astinterp in `scratch/` which is capable of running a considerable subset of py2/3.9 to the point of being able to source-import many libraries including `requests` and run PyPi sourced library code along with user code under hoisted interpretation.
It doesn't support coroutines/generators yet, and there's some machinery required to make it "safe" (meaningfully single-stepable; "fix"/support eval, enable user-defined import/`__import__` through the lifted python VM) but as a proof of concept of a lifted VM I'm genuinely shocked how well this works.
Next questions here revolve around how to "snapshot" the state of the interpreter meaningfully, and how to build a replayable interpreter log.
There are some specific challenges around how Python code interacts with native C code that could limit the viability of this approach, but at the absolute least this fully sandboxed Python interpreter could be used to implement whatever underlying magic could be desired and restricted to some language subset as desired.
The goal is to make something like this work -
``` python
from df import Activity
f1 = Activity("F1")
f2 = Activity("F2")
f3 = Activity("F3")
f4 = Activity("F4")
def main():
return f4(f3(f2(f1(None))))
```
Which may offer a possible solution to the interpreter checkpointing problem - only checkpoint "supported" operations.
Here the `Activity().__call__` operation would have special support, as with `datetime.datetime.now()` and controlling `time.sleep()`, threading and possibly `random.Random` seeding which cannot trivially be made repeatable.
### Durability challenges
FIXME - manually implementing snapshotting and recovery is hard
### Leverage with language support
FIXME - What does a DSL that helps with all this look like?

53
doc/architecture.md Normal file
View file

@ -0,0 +1,53 @@
# Architecture
Flowmetal is an interpreted language backed by a durable event store.
The execution history of a program persists to the durable store as execution precedes.
If an interpretation step fails to persist, it can't have external effects.
This is the fundamental insight behind Microsoft AMBROSIA.
The event store also provides Flowmetal's only interface for communicating with external systems.
Other systems can attach to Flowmetal's data store and send events to and receive them from Flowmetal.
For instance Flowmetal contains a reference implementation of a HTTP callback connector and of a HTTP request connector.
This allows Flowmetal programs to request that HTTP requests be sent on their behalf, consume the result, and wait for callbacks.
A Flowmetal deplyoment looks like this -
```
+----------------------------+
+---------------------------+ |
+--------------------------+ |--+
| External HTTP service(s) |--+
+--------------------------+
^ ^
| |
v v
+-----------------------+ +------------------------+
| HTTP server connector | | HTTP request connector |
+-----------------------+ +------------------------+
^ ^
| |
v v
+--------------------+ +----------------------+
| Shared event store | | Shared program store |
+--------------------+ +----------------------+
^ ^
| |
v v
+--------------------------+
| Flowmetal interpreter(s) |
+--------------------------+
```
Users interact with Flowmetal by creating (or editing) **Programs**.
An instance of a Program is called a **Task**.
Every Task has a unique **Inbox** and **Outbox**.
Comparable systems call the unit of execution a Process; we prefer Task because Process invites conflation with a Unix process or thread and our Tasks are entirely portable.
Tasks interact with the outside world by producing **Requests** into their Outbox.
For example an instance of a Task could request that some other Task be executed.
Delivering messages to some other Task, or making API calls against external services would be other good examples of Requests.
Tasks receive the results of their Requests and other external events as **Messages** in their **Inbox**.
A request that some other Task be executed would be responded to with a Message identifying the other task.
The requesting Task could choose to wait on the requested Task, or could disregard it.
Likewise the results of external requests return as Messages.

154
doc/call_cc_airflow.md Normal file
View file

@ -0,0 +1,154 @@
# What problem are you trying to solve?
In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap.
Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.) and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation.
This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand.
In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern.
Traditional automation tools such as shells struggle with this task because they make it difficult to implement features such as error handling.
Furthermore, in a distributed environment it cannot be assumed that a single machine can remain available to execute automation.
Automation authors are thus confronted not just with the need to use "real" programming languages but the need to implement a database backed state machine which can "checkpoint" the job.
Taking a step back, this is an enormous failure of the languages we have available to describe workflow tasks.
That users need to write state machines that define state machines that actually perform the desired task shows that the available tools operate at the wrong level.
Airflow for instance succeeds at providing a "workflow" control flow graph abstraction which frees users of the concerns of implementing their own resumable state machines.
Consider this example from the Airflow documentation -
```python
from __future__ import annotations
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
with DAG(
"example_branch_labels",
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = EmptyOperator(task_id="ingest")
analyse = EmptyOperator(task_id="analyze")
check = EmptyOperator(task_id="check_integrity")
describe = EmptyOperator(task_id="describe_integrity")
error = EmptyOperator(task_id="email_error")
save = EmptyOperator(task_id="save")
report = EmptyOperator(task_id="report")
ingest >> analyse >> check
check >> Label("No errors") >> save >> report
check >> Label("Errors found") >> describe >> error >> report
```
Compared to handwriting out all the nodes in the control flow graph and the requisite checkpointing to the database on state transitions, this is a considerable improvement.
But if you stop and look at this control flow graph, it's expressing a simple branching operation for which we have much more conventional notation.
Consider the same workflow using "normal" Python syntax rather than the embedded workflow syntax -
```python
def noop():
pass
def check(result):
return True
ingest = noop
analyse = noop
describe = noop
error = noop
save = noop
report = noop
def main():
ingest()
result = analyse()
if check(result):
save(result)
report(result)
else:
describe()
error()
report()
```
The program a developer authors is already a control flow graph.
We have a name for "operators" or state nodes - they're just functions.
We have a name and notation for transitions in the state chart - they're just sequential statements.
Anything we need developers to write above that baseline represents friction specific to a workflow task which we should seek to minimize.
Temporal does better than Airflow and reflects this understanding.
Using insights from Azure Durable Functions, their SDK leverages the details of Python's `async` and `await` operators to hijack program control flow and implement workflow features under the covers.
Consider this example from the Temporal documentation -
```python
@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
try:
while True:
print("Heartbeating cancel activity")
await asyncio.sleep(0.5)
activity.heartbeat("some details")
except asyncio.CancelledError:
print("Activity cancelled")
raise
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: ComposeArgsInput) -> None:
activity_handle = workflow.start_activity(
cancel_activity,
ComposeArgsInput(input.arg1, input.arg2),
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
)
await asyncio.sleep(3)
activity_handle.cancel()
```
This is really good compared to an equivalent Airflow graph!
All the details are "normal" Python, and the SDK fits "natively" into how Python execution occurs.
But it's still laden with syntax such as the `async` function coloring and decorators which serve only to support the workflow SDK.
In comparison were this workflow a "simple" Python script it would only need to be written
```python
# https://pypi.org/project/timeoutcontext/
from timeoutcontext import task_with_timeout, TimeoutError
def cancellable_activity():
try:
while True:
print("Heartbeating cancellable activity")
sleep(0.5)
except TimeoutError:
print("Activity cancelled")
raise
def main():
task = task_with_timeout(lambda: cancellable_activity(),
timeout=timedelta(minutes=5))
sleep(3)
task.cancel()
```
As with Airflow, the Temporal SDK effectively requires that the programmer learn not just a set of libraries but the Python `async` features because the implementation of the workflow engine is leaked to the users.
The problem isn't just the excessive syntax, it's that as with Airflow user workflows are no longer "normal" programs.
There is in effect an entire Temporal interpreter stacked inbetween the Python runtime with which users are familiar and the user's program.
It is in effect a new language with none of the notational advantages of being one.
The flipside is that there is an enormous advantage in tooling to be had by leveraging an existing language - or something that looks enough like one - rather than inventing a new notation.
This is the cardinal sin of workflow tools like kubeflow and various CI "workflow" formats - they adopt unique YAML or XML based notations which have no tooling support.
For instance by being "normal" (ish) Python, the Temporal SDK benefits from access to editor autocompletion, the MyPy typechecker and all manner of other tools.

47
doc/what_problem.md Normal file
View file

@ -0,0 +1,47 @@
# An Asynchronous, Distributed Task Engine
This document presents a design without reference implementation for a distributed programming system;
sometimes called a workflow engine.
It is intended to provide architectural level clarity allowing for the development of alternative designs or implementations as may suit.
## Problem Statement
In building, operating and maintaining distributed systems (many computers in concert) engineers face a tooling gap.
Within the confines of a single computer, we have shells (`bash`, `csh`, `zsh`, `oil` etc.)
and a suite of small programs which mesh together well enough for the completion of small tasks with ad-hoc automation.
This is an enormous tooling win, as it allows small tasks to be automated at least for a time with a minimum of effort and with tools close to hand.
In interacting with networks, communicating between computers is difficult with traditional tools and communication failure becomes an ever-present concern.
Traditional automation tools such as shells are inadequate for this environment because achieving network communication is excessively difficult.
In a distributed environment it cannot be assumed that a single machine can remain available to execute automation;
This requires an approach to automation which allows for the incremental execution of single tasks at a time with provisions for relocation and recovery should failure occur.
It also cannot be assumed that a single machine is sufficiently available to receive and process incoming events such as callbacks.
A distributed system is needed to wrangle distributed systems.
## Design Considerations
- Timeouts are everywhere
- Sub-Turing/boundable
-
## Architectural Overview
### Events
Things that will happen, or time out.
### Actions
Things the workflow will do, or time out.
### Bindings
Data the workflow either was given or computed.
### Conditionals
Decisions the workflow may make.
### Functions
A convenient way to talk about fragments of control flow graph.
### Tracing & Reporting

29
examples/abc.flow Normal file
View file

@ -0,0 +1,29 @@
# -*- mode: python -*-
from flowmetal import workflow
def ingest():
return {}
def analyze(data):
return data.keys()
def check(keys) -> bool:
return len(keys) > 0
def report(keys):
print(keys)
@workflow
def main():
data = ingest()
data = analyze(data)
if check(data):
report(data)
else:
raise ValueError(report(data))

26
examples/timeout.flow Normal file
View file

@ -0,0 +1,26 @@
# -*- mode: python -*-
from datetime import timedelta
from time import sleep
from flowmetal import workflow, timeout, CancelledError, TimeoutError, Task
def cancellable_activity():
try:
while True:
print("Still alive")
sleep(0.5)
except CancelledError:
print("Task killed")
@workflow
def main():
# Somewhat like a thread
t = Task(target=cancellable_activity, args=(), timeout=timedelta(minutes=5))
t.start()
try:
result = t.result(timeout=timedelta(seconds=3))
print(result)
except TimeoutError:
t.cancel()

98
scratch/astdump.py Normal file
View file

@ -0,0 +1,98 @@
"""
A (toy) tool for emitting Python ASTs as YAML formatted data.
"""
import ast
import optparse
import sys
import yaml
def propnames(node):
"""return names of attributes specific for the current node"""
props = {x for x in dir(node) if not x.startswith("_")}
if isinstance(node, ast.Module):
props -= {"body"}
if isinstance(node, (ast.Expr, ast.Attribute)):
props -= {"value"}
if isinstance(node, ast.Constant):
props -= {"n", "s"}
if isinstance(node, ast.ClassDef):
props -= {"body"}
return props
# Note that ast.NodeTransformer exists for mutations.
# This is just for reads.
class TreeDumper(ast.NodeVisitor):
def __init__(self):
super().__init__()
self._stack = []
def dump(self, node):
self.visit(node)
def visit(self, node):
# nodetype = type(node)
nodename = node.__class__.__name__
indent = " " * len(self._stack) * 2
print(indent + nodename)
for n in propnames(node):
print(indent + "%s: %s" % (n, node.__dict__[n]))
self._stack.append(node)
self.generic_visit(node)
self._stack.pop()
class YAMLTreeDumper(ast.NodeVisitor):
def __init__(self):
super().__init__()
self._stack = []
def node2yml(self, node):
try:
# nodetype = type(node)
nodename = node.__class__.__name__
return {
"op": nodename,
"props": {n: node.__dict__[n] for n in propnames(node)},
"children": [],
}
except Exception:
print(repr(node), propnames(node), dir(node))
def visit(self, node):
yml_node = self.node2yml(node)
self._stack.append(yml_node)
old_stack = self._stack
self._stack = yml_node["children"]
self.generic_visit(node)
self._stack = old_stack
return yml_node
if __name__ == "__main__":
parser = optparse.OptionParser(usage="%prog [options] <filename.py>")
opts, args = parser.parse_args()
if len(args) == 0:
parser.print_help()
sys.exit(-1)
filename = args[0]
with open(filename) as f:
root = ast.parse(f.read(), filename)
print(
yaml.dump(
YAMLTreeDumper().visit(root), default_flow_style=False, sort_keys=False
)
)

977
scratch/astinterp.py Normal file
View file

@ -0,0 +1,977 @@
# flake8: noqa: all
# Python AST interpreter written in Python
#
# This module is part of the Pycopy https://github.com/pfalcon/pycopy
# project.
#
# Copyright (c) 2019 Paul Sokolovsky
#
# The MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Modified by Reid D. 'ardem' Mckenzie in 2021 to be a bit more fully-featured
# and usable for running 'real' code as part of an experiment in implementing a
# durable Python interpreter atop the original pycopy substrate.
import ast
import logging
import os
import sys
if sys.version_info < (3, 0, 0):
builtins = __builtins__
else:
import builtins
log = logging.getLogger(__name__)
class StrictNodeVisitor(ast.NodeVisitor):
def generic_visit(self, node):
n = node.__class__.__name__
raise NotImplementedError("Visitor for node {} not implemented".format(n))
class ANamespace:
def __init__(self, node):
self.d = {}
self.parent = None
# Cross-link namespace to AST node. Note that we can't do the
# opposite, because for one node, there can be different namespaces.
self.node = node
def __getitem__(self, k):
return self.d[k]
def get(self, k, default=None):
return self.d.get(k, default)
def __setitem__(self, k, v):
self.d[k] = v
def __delitem__(self, k):
del self.d[k]
def __contains__(self, k):
return k in self.d
def __str__(self):
return "<{} {}>".format(self.__class__.__name__, self.d)
class ModuleNS(ANamespace):
pass
class FunctionNS(ANamespace):
pass
class ClassNS(ANamespace):
pass
# Pycopy by default doesn't support direct slice construction, use helper
# object to construct it.
class SliceGetter:
def __getitem__(self, idx):
return idx
slice_getter = SliceGetter()
def arg_name(arg):
if sys.version_info < (3, 0, 0):
return arg.id
else:
return arg.arg
def kwarg_defaults(args):
if sys.version_info < (3, 0, 0):
return args.defaults
else:
return args.kw_defaults
class TargetNonlocalFlow(Exception):
"""Base exception class to simulate non-local control flow transfers in
a target application."""
class TargetBreak(TargetNonlocalFlow):
pass
class TargetContinue(TargetNonlocalFlow):
pass
class TargetReturn(TargetNonlocalFlow):
pass
class VarScopeSentinel:
def __init__(self, name):
self.name = name
NO_VAR = VarScopeSentinel("no_var")
GLOBAL = VarScopeSentinel("global")
NONLOCAL = VarScopeSentinel("nonlocal")
class InterpFuncWrap:
"Callable wrapper for AST functions (FunctionDef nodes)."
def __init__(self, node, interp):
self.node = node
self.interp = interp
self.lexical_scope = interp.ns
def __call__(self, *args, **kwargs):
return self.interp.call_func(self.node, self, *args, **kwargs)
# Python don't fully treat objects, even those defining __call__() special method, as a true callable. For example, such
# objects aren't automatically converted to bound methods if looked up as another object's attributes. As we want our
# "interpreted functions" to behave as close as possible to real functions, we just wrap function object with a real
# function. An alternative might have been to perform needed checks and explicitly bind a method using
# types.MethodType() in visit_Attribute (but then maybe there would be still other cases of "callable object" vs
# "function" discrepancies).
def InterpFunc(fun):
def func(*args, **kwargs):
return fun.__call__(*args, **kwargs)
return func
class InterpWith:
def __init__(self, ctx):
self.ctx = ctx
def __enter__(self):
return self.ctx.__enter__()
def __exit__(self, tp, exc, tb):
# Don't leak meta-level exceptions into target
if isinstance(exc, TargetNonlocalFlow):
tp = exc = tb = None
return self.ctx.__exit__(tp, exc, tb)
class InterpModule:
def __init__(self, ns):
self.ns = ns
def __getattr__(self, name):
try:
return self.ns[name]
except KeyError:
raise AttributeError
def __dir__(self):
return list(self.ns.d.keys())
class ModuleInterpreter(StrictNodeVisitor):
"""An interpreter specific to a single module."""
def __init__(self, system, fname, node):
self.system = system
self.fname = fname
self.ns = self.module_ns = ModuleNS(node)
# Call stack (in terms of function AST nodes).
self.call_stack = []
# To implement "store" operation, we need to arguments: location and value to store. The operation itself is
# handled by a node visitor (e.g. visit_Name), and location is represented by AST node, but there's no support
# to pass additional arguments to a visitor (likely, because it would be a burden to explicit pass such
# additional arguments thru the chain of visitors). So instead, we store this value as field. As interpretation
# happens sequentially, there's no risk that it will be overwritten "concurrently".
self.store_val = None
# Current active exception, for bare "raise", which doesn't work across function boundaries (and that's how we
# have it - exception would be caught in visit_Try, while re-rasing would happen in visit_Raise).
self.cur_exc = []
def push_ns(self, new_ns):
new_ns.parent = self.ns
self.ns = new_ns
def pop_ns(self):
self.ns = self.ns.parent
def stmt_list_visit(self, lst):
res = None
for s in lst:
res = self.visit(s)
return res
def wrap_decorators(self, obj, node):
for deco_n in reversed(list(node.decorator_list)):
deco = self.visit(deco_n)
obj = deco(obj)
return obj
def visit(self, node):
val = super(StrictNodeVisitor, self).visit(node)
return val
def visit_Module(self, node):
self.stmt_list_visit(node.body)
def visit_Expression(self, node):
return self.visit(node.body)
def visit_ClassDef(self, node):
self.push_ns(ClassNS(node))
try:
self.stmt_list_visit(node.body)
except Exception:
self.pop_ns()
raise
ns = self.ns
self.pop_ns()
cls = type(node.name, tuple([self.visit(b) for b in node.bases]), ns.d)
cls = self.wrap_decorators(cls, node)
self.ns[node.name] = cls
# Store reference to class object in the namespace object
ns.cls = cls
def visit_Lambda(self, node):
node.name = "<lambda>"
return self.prepare_func(node)
def visit_FunctionDef(self, node):
# Defaults are evaluated at function definition time, so we
# need to do that now.
func = self.prepare_func(node)
func = self.wrap_decorators(func, node)
self.ns[node.name] = func
def prepare_func(self, node):
"""Prepare function AST node for future interpretation: pre-calculate
and cache useful information, etc."""
func = InterpFuncWrap(node, self)
args = node.args or node.posonlyargs
num_required = len(args.args) - len(args.defaults)
all_args = set()
d = {}
for i, a in enumerate(args.args):
all_args.add(arg_name(a))
if i >= num_required:
d[arg_name(a)] = self.visit(args.defaults[i - num_required])
for a, v in zip(getattr(args, "kwonlyargs", ()), kwarg_defaults(args)):
all_args.add(arg_name(a))
if v is not None:
d[arg_name(a)] = self.visit(v)
# We can store cached argument names of a function in its node -
# it's static.
node.args.all_args = all_args
# We can't store the values of default arguments - they're dynamic,
# may depend on the lexical scope.
func.defaults_dict = d
return InterpFunc(func)
def prepare_func_args(self, node, interp_func, *args, **kwargs):
def arg_num_mismatch():
raise TypeError(
"{}() takes {} positional arguments but {} were given".format(
node.name, len(argspec.args), len(args)
)
)
argspec = node.args
# If there's vararg, either offload surplus of args to it, or init
# it to empty tuple (all in one statement). If no vararg, error on
# too many args.
#
# Note that we have to do the .posonlyargs dance
if argspec.vararg:
self.ns[argspec.vararg.arg] = args[len(argspec.args) :]
else:
if len(args) > len(argspec.args or getattr(argspec, "posonlyargs", ())):
arg_num_mismatch()
if argspec.args:
for i in range(min(len(args), len(argspec.args))):
self.ns[arg_name(argspec.args[i])] = args[i]
elif getattr(argspec, "posonlyargs", ()):
if len(args) != len(argspec.posonlyargs):
arg_num_mismatch()
for a, value in zip(argspec.posonlyargs, args):
self.ns[arg_name(a)] = value
# Process incoming keyword arguments, putting them in namespace if
# actual arg exists by that name, or offload to function's kwarg
# if any. All make needed checks and error out.
func_kwarg = {}
for k, v in kwargs.items():
if k in argspec.all_args:
if k in self.ns:
raise TypeError(
"{}() got multiple values for argument '{}'".format(
node.name, k
)
)
self.ns[k] = v
elif argspec.kwarg:
func_kwarg[k] = v
else:
raise TypeError(
"{}() got an unexpected keyword argument '{}'".format(node.name, k)
)
if argspec.kwarg:
self.ns[arg_name(argspec.kwarg)] = func_kwarg
# Finally, overlay default values for arguments not yet initialized.
# We need to do this last for "multiple values for the same arg"
# check to work.
for k, v in interp_func.defaults_dict.items():
if k not in self.ns:
self.ns[k] = v
# And now go thru and check for any missing arguments.
for a in argspec.args:
if arg_name(a) not in self.ns:
raise TypeError(
"{}() missing required positional argument: '{}'".format(
node.name, arg_name(a)
)
)
for a in getattr(argspec, "kwonlyargs", ()):
if a.arg not in self.ns:
raise TypeError(
"{}() missing required keyword-only argument: '{}'".format(
node.name, arg_name(a)
)
)
def call_func(self, node, interp_func, *args, **kwargs):
self.call_stack.append(node)
# We need to switch from dynamic execution scope to lexical scope
# in which function was defined (then switch back on return).
dyna_scope = self.ns
self.ns = interp_func.lexical_scope
self.push_ns(FunctionNS(node))
try:
self.prepare_func_args(node, interp_func, *args, **kwargs)
if isinstance(node.body, list):
res = self.stmt_list_visit(node.body)
else:
res = self.visit(node.body)
except TargetReturn as e:
res = e.args[0]
finally:
self.pop_ns()
self.ns = dyna_scope
self.call_stack.pop()
return res
def visit_Return(self, node):
if not isinstance(self.ns, FunctionNS):
raise SyntaxError("'return' outside function")
raise TargetReturn(node.value and self.visit(node.value))
def visit_With(self, node):
assert len(node.items) == 1
ctx = self.visit(node.items[0].context_expr)
with InterpWith(ctx) as val:
if node.items[0].optional_vars is not None:
self.handle_assign(node.items[0].optional_vars, val)
self.stmt_list_visit(node.body)
def visit_Try(self, node):
try:
self.stmt_list_visit(node.body)
except TargetNonlocalFlow:
raise
except Exception as e:
self.cur_exc.append(e)
try:
for h in getattr(node, "handlers", ()):
if h.type is None or isinstance(e, self.visit(h.type)):
if h.name:
self.ns[h.name] = e
self.stmt_list_visit(h.body)
if h.name:
del self.ns[h.name]
break
else:
raise
finally:
self.cur_exc.pop()
else:
self.stmt_list_visit(node.orelse)
finally:
if getattr(node, "finalbody", None):
self.stmt_list_visit(node.finalbody)
def visit_TryExcept(self, node):
# Py2k only; py3k merged all this into one node type.
return self.visit_Try(node)
def visit_TryFinally(self, node):
# Py2k only; py3k merged all this into one node type.
return self.visit_Try(node)
def visit_For(self, node):
iter = self.visit(node.iter)
for item in iter:
self.handle_assign(node.target, item)
try:
self.stmt_list_visit(node.body)
except TargetBreak:
break
except TargetContinue:
continue
else:
self.stmt_list_visit(node.orelse)
def visit_While(self, node):
while self.visit(node.test):
try:
self.stmt_list_visit(node.body)
except TargetBreak:
break
except TargetContinue:
continue
else:
self.stmt_list_visit(node.orelse)
def visit_Break(self, node):
raise TargetBreak
def visit_Continue(self, node):
raise TargetContinue
def visit_If(self, node):
test = self.visit(node.test)
if test:
self.stmt_list_visit(node.body)
else:
self.stmt_list_visit(node.orelse)
def visit_Import(self, node):
for n in node.names:
self.ns[n.asname or n.name] = self.system.handle_import(n.name)
def visit_ImportFrom(self, node):
mod = self.system.handle_import(
node.module, None, None, [n.name for n in node.names], node.level
)
for n in node.names:
if n.name == "*":
# This is the special case of the wildcard import. Copy
# everything over.
for n in getattr(mod, "__all__", dir(mod)):
self.ns[n] = getattr(mod, n)
else:
self.ns[n.asname or n.name] = getattr(mod, n.name)
def visit_Raise(self, node):
if node.exc is None:
if not self.cur_exc:
raise RuntimeError("No active exception to reraise")
raise self.cur_exc[-1]
elif node.cause is None:
raise self.visit(node.exc)
# else:
# raise self.visit(node.exc) from self.visit(node.cause)
def visit_AugAssign(self, node):
assert isinstance(node.target.ctx, ast.Store)
# Not functional style, oops. Node in AST has store context, but we
# need to read its value first. To not construct a copy of the entire
# node with load context, we temporarily patch it in-place.
save_ctx = node.target.ctx
node.target.ctx = ast.Load()
var_val = self.visit(node.target)
node.target.ctx = save_ctx
rval = self.visit(node.value)
# As augmented assignment is statement, not operator, we can't put them
# all into map. We could instead directly lookup special inplace methods
# (__iadd__ and friends) and use them, with a fallback to normal binary
# operations, but from the point of view of this interpreter, presence
# of such methods is an implementation detail of the object system, it's
# not concerned with it.
op = type(node.op)
if op is ast.Add:
var_val += rval
elif op is ast.Sub:
var_val -= rval
elif op is ast.Mult:
var_val *= rval
elif op is ast.Div:
var_val /= rval
elif op is ast.FloorDiv:
var_val //= rval
elif op is ast.Mod:
var_val %= rval
elif op is ast.Pow:
var_val **= rval
elif op is ast.LShift:
var_val <<= rval
elif op is ast.RShift:
var_val >>= rval
elif op is ast.BitAnd:
var_val &= rval
elif op is ast.BitOr:
var_val |= rval
elif op is ast.BitXor:
var_val ^= rval
else:
raise NotImplementedError
self.store_val = var_val
self.visit(node.target)
def visit_Assign(self, node):
val = self.visit(node.value)
for n in node.targets:
self.handle_assign(n, val)
def handle_assign(self, target, val):
if isinstance(target, ast.Tuple):
it = iter(val)
try:
for elt_idx, t in enumerate(target.elts):
if getattr(ast, "Starred", None) and isinstance(t, ast.Starred):
t = t.value
all_elts = list(it)
break_i = len(all_elts) - (len(target.elts) - elt_idx - 1)
self.store_val = all_elts[:break_i]
it = iter(all_elts[break_i:])
else:
self.store_val = next(it)
self.visit(t)
except StopIteration:
raise ValueError(
"not enough values to unpack (expected {})".format(len(target.elts))
)
try:
next(it)
raise ValueError(
"too many values to unpack (expected {})".format(len(target.elts))
)
except StopIteration:
# Expected
pass
else:
self.store_val = val
self.visit(target)
def visit_Delete(self, node):
for n in node.targets:
self.visit(n)
def visit_Pass(self, node):
pass
def visit_Assert(self, node):
if node.msg is None:
assert self.visit(node.test)
else:
assert self.visit(node.test), self.visit(node.msg)
def visit_Expr(self, node):
# Produced value is ignored
self.visit(node.value)
def enumerate_comps(self, iters):
"""Enumerate thru all possible values of comprehension clauses,
including multiple "for" clauses, each optionally associated
with multiple "if" clauses. Current result of the enumeration
is stored in the namespace."""
def eval_ifs(iter):
"""Evaluate all "if" clauses."""
for cond in iter.ifs:
if not self.visit(cond):
return False
return True
if not iters:
yield
return
for el in self.visit(iters[0].iter):
self.store_val = el
self.visit(iters[0].target)
for t in self.enumerate_comps(iters[1:]):
if eval_ifs(iters[0]):
yield
def visit_ListComp(self, node):
self.push_ns(FunctionNS(node))
try:
return [self.visit(node.elt) for _ in self.enumerate_comps(node.generators)]
finally:
self.pop_ns()
def visit_SetComp(self, node):
self.push_ns(FunctionNS(node))
try:
return {self.visit(node.elt) for _ in self.enumerate_comps(node.generators)}
finally:
self.pop_ns()
def visit_DictComp(self, node):
self.push_ns(FunctionNS(node))
try:
return {
self.visit(node.key): self.visit(node.value)
for _ in self.enumerate_comps(node.generators)
}
finally:
self.pop_ns()
def visit_IfExp(self, node):
if self.visit(node.test):
return self.visit(node.body)
else:
return self.visit(node.orelse)
def visit_Call(self, node):
func = self.visit(node.func)
args = []
for a in node.args:
if getattr(ast, "Starred", None) and isinstance(a, ast.Starred):
args.extend(self.visit(a.value))
else:
args.append(self.visit(a))
kwargs = {}
for kw in node.keywords:
val = self.visit(kw.value)
if kw.arg is None:
kwargs.update(val)
else:
kwargs[kw.arg] = val
if func is builtins.super and not args:
if not self.ns.parent or not isinstance(self.ns.parent, ClassNS):
raise RuntimeError("super(): no arguments")
# As we're creating methods dynamically outside of class, super() without argument won't work, as that
# requires __class__ cell. Creating that would be cumbersome (Pycopy definitely lacks enough introspection
# for that), so we substitute 2 implied args (which argumentless super() would take from cell and 1st arg to
# func). In our case, we take them from prepared bookkeeping info.
args = (self.ns.parent.cls, self.ns["self"])
return func(*args, **kwargs)
def visit_Compare(self, node):
cmpop_map = {
ast.Eq: lambda x, y: x == y,
ast.NotEq: lambda x, y: x != y,
ast.Lt: lambda x, y: x < y,
ast.LtE: lambda x, y: x <= y,
ast.Gt: lambda x, y: x > y,
ast.GtE: lambda x, y: x >= y,
ast.Is: lambda x, y: x is y,
ast.IsNot: lambda x, y: x is not y,
ast.In: lambda x, y: x in y,
ast.NotIn: lambda x, y: x not in y,
}
lv = self.visit(node.left)
for op, r in zip(node.ops, node.comparators):
rv = self.visit(r)
if not cmpop_map[type(op)](lv, rv):
return False
lv = rv
return True
def visit_BoolOp(self, node):
if isinstance(node.op, ast.And):
res = True
for v in node.values:
res = res and self.visit(v)
elif isinstance(node.op, ast.Or):
res = False
for v in node.values:
res = res or self.visit(v)
else:
raise NotImplementedError
return res
def visit_BinOp(self, node):
binop_map = {
ast.Add: lambda x, y: x + y,
ast.Sub: lambda x, y: x - y,
ast.Mult: lambda x, y: x * y,
ast.Div: lambda x, y: x / y,
ast.FloorDiv: lambda x, y: x // y,
ast.Mod: lambda x, y: x % y,
ast.Pow: lambda x, y: x ** y,
ast.LShift: lambda x, y: x << y,
ast.RShift: lambda x, y: x >> y,
ast.BitAnd: lambda x, y: x & y,
ast.BitOr: lambda x, y: x | y,
ast.BitXor: lambda x, y: x ^ y,
}
l = self.visit(node.left)
r = self.visit(node.right)
return binop_map[type(node.op)](l, r)
def visit_UnaryOp(self, node):
unop_map = {
ast.UAdd: lambda x: +x,
ast.USub: lambda x: -x,
ast.Invert: lambda x: ~x,
ast.Not: lambda x: not x,
}
val = self.visit(node.operand)
return unop_map[type(node.op)](val)
def visit_Subscript(self, node):
obj = self.visit(node.value)
idx = self.visit(node.slice)
if isinstance(node.ctx, ast.Load):
return obj[idx]
elif isinstance(node.ctx, ast.Store):
obj[idx] = self.store_val
elif isinstance(node.ctx, ast.Del):
del obj[idx]
else:
raise NotImplementedError
def visit_Index(self, node):
return self.visit(node.value)
def visit_Slice(self, node):
# Any of these can be None
lower = node.lower and self.visit(node.lower)
upper = node.upper and self.visit(node.upper)
step = node.step and self.visit(node.step)
slice = slice_getter[lower:upper:step]
return slice
def visit_Attribute(self, node):
obj = self.visit(node.value)
if isinstance(node.ctx, ast.Load):
return getattr(obj, node.attr)
elif isinstance(node.ctx, ast.Store):
setattr(obj, node.attr, self.store_val)
elif isinstance(node.ctx, ast.Del):
delattr(obj, node.attr)
else:
raise NotImplementedError
def visit_Global(self, node):
for n in node.names:
if n in self.ns and self.ns[n] is not GLOBAL:
raise SyntaxError(
"SyntaxError: name '{}' is assigned to before global declaration".format(
n
)
)
# Don't store GLOBAL in the top-level namespace
if self.ns.parent:
self.ns[n] = GLOBAL
def visit_Nonlocal(self, node):
if isinstance(self.ns, ModuleNS):
raise SyntaxError("nonlocal declaration not allowed at module level")
for n in node.names:
self.ns[n] = NONLOCAL
def resolve_nonlocal(self, id, ns):
while ns:
res = ns.get(id, NO_VAR)
if res is GLOBAL:
return self.module_ns
if res is not NO_VAR and res is not NONLOCAL:
if isinstance(ns, ModuleNS):
break
return ns
ns = ns.parent
raise SyntaxError("no binding for nonlocal '{}' found".format(id))
def visit_Name(self, node):
if isinstance(node.ctx, ast.Load):
res = NO_VAR
ns = self.ns
# We always lookup in the current namespace (on the first iteration), but afterwards we always skip class
# namespaces. Or put it another way, class code can look up in its own namespace, but that's the only case
# when the class namespace is consulted.
skip_classes = False
while ns:
if not (skip_classes and isinstance(ns, ClassNS)):
res = ns.get(node.id, NO_VAR)
if res is not NO_VAR:
break
ns = ns.parent
skip_classes = True
if res is NONLOCAL:
ns = self.resolve_nonlocal(node.id, ns.parent)
return ns[node.id]
if res is GLOBAL:
res = self.module_ns.get(node.id, NO_VAR)
if res is not NO_VAR:
return res
try:
return getattr(builtins, node.id)
except AttributeError:
raise NameError("name '{}' is not defined".format(node.id))
elif isinstance(node.ctx, ast.Store):
res = self.ns.get(node.id, NO_VAR)
if res is GLOBAL:
self.module_ns[node.id] = self.store_val
elif res is NONLOCAL:
ns = self.resolve_nonlocal(node.id, self.ns.parent)
ns[node.id] = self.store_val
else:
self.ns[node.id] = self.store_val
elif isinstance(node.ctx, ast.Del):
res = self.ns.get(node.id, NO_VAR)
if res is NO_VAR:
raise NameError("name '{}' is not defined".format(node.id))
elif res is GLOBAL:
del self.module_ns[node.id]
elif res is NONLOCAL:
ns = self.resolve_nonlocal(node.id, self.ns.parent)
del ns[node.id]
else:
del self.ns[node.id]
else:
raise NotImplementedError
def visit_Dict(self, node):
return {self.visit(p[0]): self.visit(p[1]) for p in zip(node.keys, node.values)}
def visit_Set(self, node):
return {self.visit(e) for e in node.elts}
def visit_List(self, node):
return [self.visit(e) for e in node.elts]
def visit_Tuple(self, node):
return tuple([self.visit(e) for e in node.elts])
def visit_NameConstant(self, node):
return node.value
def visit_Ellipsis(self, node):
# In Py3k only
from ast import Ellipsis
return Ellipsis
def visit_Print(self, node):
# In Py2k only
raise NotImplementedError("Absolutely not. Use __future__.")
def visit_Str(self, node):
return node.s
def visit_Bytes(self, node):
return node.s
def visit_Num(self, node):
return node.n
class InterpreterSystem(object):
"""A bag of shared state."""
def __init__(self, path=None):
self.modules = {}
self.path = path or sys.path
def handle_import(self, name, globals=None, locals=None, fromlist=(), level=0):
log.debug(" Attempting to import '{}'".format(name))
if name not in self.modules:
if name in sys.modules:
log.debug(" Short-circuited from bootstrap sys.modules")
self.modules[name] = sys.modules[name]
else:
name = name.replace(".", os.path.sep)
for e in self.path:
for ext in [
# ".flow",
".py",
]:
if os.path.isdir(e):
f = os.path.join(e, name + ext)
log.debug(" Checking {}".format(f))
if os.path.exists(f):
mod = self.load(f)
self.modules[name] = mod.ns
break
elif os.path.isfile(e):
# FIXME (arrdem 2021-05-31)
raise RuntimeError(
"Import from .zip/.whl/.egg archives aren't supported yet"
)
else:
self.modules[name] = __import__(
name, globals, locals, fromlist, level
)
return self.modules[name]
def load(self, fname):
with open(fname) as f:
tree = ast.parse(f.read())
interp = ModuleInterpreter(self, fname, tree)
interp.visit(tree)
return interp
def execute(self, fname):
with open(fname) as f:
tree = ast.parse(f.read())
interp = ModuleInterpreter(self, fname, tree)
interp.ns["__name__"] = "__main__"
self.modules["__main__"] = InterpModule(interp.ns)
interp.visit(tree)
return interp
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
InterpreterSystem().execute(sys.argv[1])

34
scratch/test.py Normal file
View file

@ -0,0 +1,34 @@
#!astinterp
from __future__ import print_function
class Foo(object):
def bar(self):
return 1
@property
def baz(self):
print("'Computing' baz...")
return 2
a = Foo()
print(a.bar())
print(a.baz)
import random
for _ in range(10):
print(random.randint(0, 1024))
def bar(a, b, **bs):
pass
import requests
print(len(requests.get("https://pypi.org/pypi/requests/json").text))

View file

@ -1,37 +0,0 @@
from setuptools import setup
setup(
name="arrdem.flowmetal",
# Package metadata
version='0.0.0',
license="MIT",
description="A weird execution engine",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Reid 'arrdem' McKenzie",
author_email="me@arrdem.com",
url="https://git.arrdem.com/arrdem/flowmetal",
classifiers=[
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
],
# Package setup
package_dir={"": "src/python"},
packages=[
"flowmetal",
],
entry_points = {
'console_scripts': [
'iflow=flowmetal.repl:main'
],
},
install_requires=[
'prompt-toolkit~=3.0.0',
],
extras_require={
}
)

View file

@ -1,83 +0,0 @@
(defpackage com.twitter.wilson
(require
;; The log lets you record status information into a program's trace
[flowmetal.log
:refer [log!]]
;; The time system lets you put bounds on the implicit awaiting Flowmetal does
[flowmetal.time
:refer [with-timeout!, timeout?, make-duration, duration?, +seconds+, +hours+, sleep!]]
;; JSON. Simple enough
[flowmetal.json
:refer [loads, dumps, json?]]
;; Extensions! Provided by other systems.
;;
;; This one allows for an external service to receive HTTP callbacks on Flowmetal's behalf.
[http.callback
:refer [make-callback!, get-callback!, callback?]]
;; This one allows for an external service to make HTTP requests on Flowmetal's behalf.
[http.request
:refer [post!, error?, dns-error?, connection-error?, response-error?]])
(defenum stage
+reboot+
+bios-update+
+reinstall+)
;; FIXME:- how to do table optimization?
(defn fib [x]
(match x
[0 1]
[1 1]
[_ (+ (fib (- x 1) (- x 2)))]))
(defn retry-http [f
:- (fn? [] a?)
backoff-fn
:- (fn? [int?] duration?)
:default (fn [x :- int?]
:- duration?
(make-duration (fib x) +seconds+))
backoff-count
:- int?
:default 0]
:- a
"The implementation of HTTP with retrying."
(let [response (f)]
(if (not (error? response))
response
;; FIXME:- how does auth denied get represented?
(if (or (dns-error? response)
(connection-error? response)
(response-error? response))
(do (sleep (backoff-fn backoff-count))
(retry-http* f backoff-fn (+ backoff-count 1)))))))
(defn job [hostname
:- str?
stages
:- (list? stage?)
job-timeout
:- duration?
:default (duration 3 :hours)]
:- (union? [timeout? json?])
"Run a wilson job, wait for the callback and process the result.
By default the job is only waited for three hours.
"
(let [callback :- callback? (make-callback!)
job (retry-http
(fn []
(post "http://wilson.local.twitter.com"
:data
(dumps
{:host hostname
:stages [stages]
:callbacks [{:type :http, :url callback}]}))))]
(let [result (with-timeout! (duration 3 :hours)
(fn []
(get-callback callback)))]
(if-not (timeout? result)
(loads result)
result)))))

View file

@ -1,33 +0,0 @@
(defpackage flowmetal.time
(defenum time-unit
"Calendar-independent durations."
+milliseconds+
+seconds+
+hours+
+days+)
(defrecord duration [num :- int?, scale :- time-unit?]
"A type for representing scalar durations.")
(defn as-milliseconds [d :- duration?]
:- duration?
"Normalize a duration to a number of milliseconds."
(match d
[(duration x +days+) (duration (* x 24) +hours+)]
[(duration x +hours+) (duration (* x 60) +minutes+)]
[(duration x +minutes+) (duration (* x 60) +seconds+)]
[(duration x +seconds+) (duration (* x 1000) +milliseconds+)]
[(duration x +milliseconds+) d]))
;; A type of one value used to represent an error
(defenum timeout
+timeout+)
(defendpoint with-timeout!
[d :- duration?
f :- (fn? [] :- a)]
:- a
)
)

View file

@ -1 +0,0 @@
#!/usr/bin/env python3

View file

@ -0,0 +1,26 @@
"""
The Flowmetal server entry point.
"""
import click
from flowmetal import (
frontend,
interpreter,
reaper,
scheduler,
)
@click.group()
def cli():
pass
cli.add_command(frontend.cli, name="frontend")
cli.add_command(interpreter.cli, name="interpreter")
cli.add_command(scheduler.cli, name="scheduler")
cli.add_command(reaper.cli, name="reaper")
if __name__ == "__main__":
cli()

View file

@ -0,0 +1,28 @@
"""
An abstract or base Flowmetal DB.
"""
from abc import (
abstractclassmethod,
abstractmethod,
)
class Db(ABC):
"""An abstract Flowmetal DB."""
@abstractclassmethod
def connect(cls, config):
"""Build and return a connected DB."""
@abstractmethod
def disconnect(self):
"""Disconnect from the underlying DB."""
def close(self):
"""An alias for disconnect allowing for it to quack as a closable."""
self.disconnect()
@abstractmethod
def reconnect(self):
"""Attempt to reconnect; either after an error or disconnecting."""

View file

@ -0,0 +1,3 @@
"""
An implementation of the Flowmetal DB backed by Redis.
"""

View file

@ -0,0 +1,9 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -0,0 +1,9 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -0,0 +1,3 @@
"""
Somewhat generic models of Flowmetal programs.
"""

View file

@ -1,511 +0,0 @@
"""
A parser for s-expressions.
"""
from abc import ABC, abstractmethod
from enum import Enum
from io import StringIO, BufferedReader
from typing import IO, NamedTuple, Any
from fractions import Fraction
import re
## Types
class Position(NamedTuple):
"""An encoding for the location of a read token within a source."""
source: str
line: int
col: int
offset: int
@staticmethod
def next_pos(pos: "Position"):
return Position(pos.source, pos.line, pos.col + 1, pos.offset + 1)
@staticmethod
def next_line(pos: "Position"):
return Position(pos.source, pos.line + 1, 1, pos.offset + 1)
class TokenBase(object):
"""The shared interface to tokens."""
@property
@abstractmethod
def pos(self):
"""The position of the token within its source."""
@property
@abstractmethod
def raw(self):
"""The raw token as scanned."""
class ConstTokenBase(TokenBase, NamedTuple):
"""The shared interface for constant tokens"""
data: Any
raw: str
pos: Position
# Hash according to data
def __hash__(self):
return hash(self.data)
# And make sure it's orderable
def __eq__(self, other):
return self.data == other
def __lt__(self, other):
return self.data < other
def __gt__(self, other):
return self.data > other
class BooleanToken(ConstTokenBase):
"""A read boolean."""
class IntegerToken(ConstTokenBase):
"""A read integer, including position."""
class FractionToken(ConstTokenBase):
"""A read fraction, including position."""
class FloatToken(ConstTokenBase):
"""A read floating point number, including position."""
class SymbolToken(ConstTokenBase):
"""A read symbol, including position."""
class KeywordToken(ConstTokenBase):
"""A read keyword."""
class StringToken(ConstTokenBase):
"""A read string, including position."""
class ListType(Enum):
"""The supported types of lists."""
ROUND = ("(", ")")
SQUARE = ("[", "]")
class ListToken(NamedTuple, TokenBase):
"""A read list, including its start position and the paren type."""
data: list
raw: str
pos: Position
paren: ListType = ListType.ROUND
class SetToken(NamedTuple, TokenBase):
"""A read set, including its start position."""
data: list
raw: str
pos: Position
class MappingToken(NamedTuple, TokenBase):
"""A read mapping, including its start position."""
data: list
raw: str
pos: Position
class WhitespaceToken(NamedTuple, TokenBase):
"""A bunch of whitespace with no semantic value."""
data: str
raw: str
pos: Position
class CommentToken(WhitespaceToken):
"""A read comment with no semantic value."""
## Parser implementation
class PosTrackingBufferedReader(object):
"""A slight riff on BufferedReader which only allows for reads and peeks of a
char, and tracks positions.
Perfect for implementing LL(1) parsers.
"""
def __init__(self, f: IO, source_name=None):
self._next_pos = self._pos = Position(source_name, 1, 1, 0)
self._char = None
self._f = f
def pos(self):
return self._pos
def peek(self):
if self._char is None:
self._char = self._f.read(1)
return self._char
def read(self):
# Accounting for lookahead(1)
ch = self._char or self._f.read(1)
self._char = self._f.read(1)
# Accounting for the positions
self._pos = self._next_pos
if ch == "\r" and self.peek() == "\n":
super.read(1) # Throw out a character
self._next_pos = Position.next_line(self._next_pos)
elif ch == "\n":
self._next_pos = Position.next_line(self._next_pos)
else:
self._next_pos = Position.next_pos(self._next_pos)
return ch
class ReadThroughBuffer(PosTrackingBufferedReader):
"""A duck that quacks like a PosTrackingBufferedReader."""
def __init__(self, ptcr: PosTrackingBufferedReader):
self._reader = ptcr
self._buffer = StringIO()
def pos(self):
return self._reader.pos()
def peek(self):
return self._reader.peek()
def read(self):
ch = self._reader.read()
self._buffer.write(ch)
return ch
def __str__(self):
return self._buffer.getvalue()
def __enter__(self, *args):
return self
def __exit__(self, *args):
pass
class SexpParser(ABC):
@classmethod
@abstractmethod
def parse(cls, f: PosTrackingBufferedReader) -> TokenBase:
"""Parse an s-expression, returning a parsed token tree."""
def read(cls, f: PosTrackingBufferedReader):
"""Parse to a token tree and read to values returning the resulting values."""
return cls.parse(f).read()
class Parser(SexpParser):
"""A basic parser which knows about lists, symbols and numbers.
Intended as a base class / extension point for other parsers.
"""
@classmethod
def parse(cls, f: PosTrackingBufferedReader):
if not f.peek():
raise SyntaxError(f"Got end of file ({f.pos()}) while parsing")
elif cls.ispunct(f.peek()):
if f.peek() == "(":
return cls.parse_list(f)
elif f.peek() == "[":
return cls.parse_sqlist(f)
elif f.peek() == '"':
return cls.parse_str(f)
elif f.peek() == ";":
return cls.parse_comment(f)
else:
raise SyntaxError(f"Got unexpected punctuation {f.read()!r} at {f.pos()} while parsing")
elif cls.isspace(f.peek()):
return cls.parse_whitespace(f)
else:
return cls.parse_symbol(f)
@classmethod
def isspace(cls, ch: str):
"""An extension point allowing for a more expansive concept of whitespace."""
return ch.isspace() or ch == ','
@classmethod
def ispunct(cls, ch: str):
return ch in (
'"'
';' # Semicolon
'()' # Parens
'⟮⟯' # 'flat' parens
'[]' # Square brackets
'⟦⟧' # 'white' square brackets
'{}' # Curly brackets
'⟨⟩' # Angle brackets
'《》' # Double angle brackets
'⟪⟫' # Another kind of double angle brackets
)
@classmethod
def parse_delimeted(cls, f: PosTrackingBufferedReader, openc, closec, ctor):
with ReadThroughBuffer(f) as rtb:
pos = None
for c in openc:
pos = pos or rtb.pos()
assert rtb.read() == c # Discard the leading delimeter
pos = rtb.pos()
acc = []
while f.peek() != closec:
if not f.peek():
raise SyntaxError(f"Got end of file while parsing {openc!r}...{closec!r} starting at {pos}")
try:
acc.append(cls.parse(rtb))
except SyntaxError as e:
raise SyntaxError(f"While parsing {openc!r}...{closec!r} starting at {pos},\n{e}")
assert rtb.read() == closec # Discard the trailing delimeter
return ctor(acc, str(rtb), pos)
# FIXME (arrdem 2020-07-18):
# Break this apart and make the supported lists composable features somehow?
@classmethod
def parse_list(cls, f: PosTrackingBufferedReader):
return cls.parse_delimeted(f, "(", ")", lambda *args: ListToken(*args, ListType.ROUND))
@classmethod
def parse_sqlist(cls, f: PosTrackingBufferedReader):
return cls.parse_delimeted(f, "[", "]", lambda *args: ListToken(*args, ListType.SQUARE))
# FIXME (arrdem 2020-07-18):
# Break this apart into middleware or composable features somehow?
@classmethod
def handle_symbol(cls, buff, pos):
def _sign(m, idx):
if m.group(idx) == '-':
return -1
else:
return 1
# Parsing integers with bases
if m := re.fullmatch(r"([+-]?)(\d+)r([a-z0-9_]+)", buff):
return IntegerToken(
_sign(m, 1) * int(m.group(3).replace("_", ""),
int(m.group(2))),
buff,
pos,
)
# Parsing hex numbers
if m := re.fullmatch(r"([+-]?)0[xX]([A-Fa-f0-9_]*)", buff):
val = m.group(2).replace("_", "")
return IntegerToken(_sign(m, 1) * int(val, 16), buff, pos)
# Parsing octal numbers
if m := re.fullmatch(r"([+-]?)0([\d_]*)", buff):
val = m.group(2).replace("_", "")
return IntegerToken(_sign(m, 1) * int(val, 8), buff, pos)
# Parsing integers
if m := re.fullmatch(r"([+-]?)\d[\d_]*", buff):
return IntegerToken(int(buff.replace("_", "")), buff, pos)
# Parsing fractions
if m := re.fullmatch(r"([+-]?)(\d[\d_]*)/(\d[\d_]*)", buff):
return FractionToken(
Fraction(
int(m.group(2).replace("_", "")),
int(m.group(3).replace("_", ""))),
buff,
pos,
)
# Parsing floats
if re.fullmatch(r"([+-]?)\d[\d_]*(\.\d[\d_]*)?(e[+-]?\d[\d_]*)?", buff):
return FloatToken(float(buff), buff, pos)
# Booleans
if buff == "true":
return BooleanToken(True, buff, pos)
if buff == "false":
return BooleanToken(False, buff, pos)
# Keywords
if buff.startswith(":"):
return KeywordToken(buff, buff, pos)
# Default behavior
return SymbolToken(buff, buff, pos)
@classmethod
def parse_symbol(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
pos = None
while rtb.peek() and not cls.isspace(rtb.peek()) and not cls.ispunct(rtb.peek()):
pos = pos or rtb.pos()
rtb.read()
buff = str(rtb)
return cls.handle_symbol(buff, pos)
@classmethod
def parse_whitespace(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
pos = None
while rtb.peek() and cls.isspace(rtb.peek()):
pos = pos or rtb.pos()
ch = rtb.read()
if ch == "\n":
break
buff = str(rtb)
return WhitespaceToken(buff, buff, pos)
@classmethod
def parse_comment(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
pos = None
while rtb.read() not in ["\n", ""]:
pos = pos or rtb.pos()
continue
buff = str(rtb)
return CommentToken(buff, buff, pos)
@classmethod
def handle_escape(cls, ch: str):
if ch == 'n':
return "\n"
elif ch == 'r':
return "\r"
elif ch == 'l':
return "\014" # form feed
elif ch == 't':
return "\t"
elif ch == '"':
return '"'
@classmethod
def parse_str(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
assert rtb.read() == '"'
pos = rtb.pos()
content = []
while True:
if not rtb.peek():
raise
# Handle end of string
elif rtb.peek() == '"':
rtb.read()
break
# Handle escape sequences
elif rtb.peek() == '\\':
rtb.read() # Discard the escape leader
# Octal escape
if rtb.peek() == '0':
rtb.read()
buff = []
while rtb.peek() in '01234567':
buff.append(rtb.read())
content.append(chr(int(''.join(buff), 8)))
# Hex escape
elif rtb.peek() == 'x':
rtb.read() # Discard the escape leader
buff = []
while rtb.peek() in '0123456789abcdefABCDEF':
buff.append(rtb.read())
content.append(chr(int(''.join(buff), 16)))
else:
content.append(cls.handle_escape(rtb.read()))
else:
content.append(rtb.read())
buff = str(rtb)
return StringToken(content, buff, pos)
## Parsing interface
def parses(buff: str,
parser: SexpParser = Parser,
source_name=None):
"""Parse a single s-expression from a string, returning its token tree."""
return parse(StringIO(buff), parser, source_name or f"<string {id(buff):x}>")
def parsef(path: str,
parser: SexpParser = Parser):
"""Parse a single s-expression from the file named by a string, returning its token tree."""
with open(path, "r") as f:
return parse(f, parser, path)
def parse(file: IO,
parser: SexpParser = Parser,
source_name=None):
"""Parse a single sexpression from a file-like object, returning its token tree."""
return parser.parse(
PosTrackingBufferedReader(
file,
source_name=source_name
)
)
## Loading interface
def loads(buff: str,
parser: SexpParser = Parser,
source_name=None):
"""Load a single s-expression from a string, returning its object representation."""
return load(StringIO(buff), parser, source_name or f"<string {id(buff):x}>")
def loadf(path: str,
parser: SexpParser = Parser):
"""Load a single s-expression from the file named by a string, returning its object representation."""
with open(path, "r") as f:
return load(f, parser, path)
def load(file: IO,
parser: SexpParser = Parser,
source_name=None):
"""Load a single sexpression from a file-like object, returning its object representation."""
return parser.load(
PosTrackingBufferedReader(
file,
source_name=source_name
)
)
## Dumping interface
def dump(file: IO, obj):
"""Given an object, dump its s-expression coding to the given file-like object."""
raise NotImplementedError()
def dumps(obj):
"""Given an object, dump its s-expression coding to a string and return that string."""
with StringIO("") as f:
dump(f, obj)
return str(f)

View file

@ -0,0 +1,9 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -1,78 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
from flowmetal.syntax_analyzer import analyzes
from prompt_toolkit import print_formatted_text, prompt, PromptSession
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.history import FileHistory
from prompt_toolkit.styles import Style
STYLE = Style.from_dict({
# User input (default text).
"": "",
"prompt": "ansigreen",
"time": "ansiyellow"
})
class InterpreterInterrupt(Exception):
"""An exception used to break the prompt or evaluation."""
def pp(t, indent=""):
if isinstance(t, list): # lists
buff = ["["]
for e in t:
buff.append(f"{indent} " + pp(e, indent+" ")+",")
return "\n".join(buff + [f"{indent}]"])
elif hasattr(t, '_fields'): # namedtuples
buff = [f"{type(t).__name__}("]
for field, value in zip(t._fields, t):
buff.append(f"{indent} {field}=" + pp(value, indent+" ")+",")
return "\n".join(buff + [f"{indent})"])
elif isinstance(t, tuple): # tuples
buff = ["("]
for e in t:
buff.append(f"{indent} " + pp(e, indent+" ")+",")
return "\n".join(buff + [f"{indent})"])
else:
return repr(t)
parser = argparse.ArgumentParser()
def main():
"""REPL entry point."""
args = parser.parse_args(sys.argv[1:])
logger = logging.getLogger("flowmetal")
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
session = PromptSession(history=FileHistory(".iflow.history"))
line_no = 0
while True:
try:
line = session.prompt([("class:prompt", ">>> ")], style=STYLE)
except (InterpreterInterrupt, KeyboardInterrupt):
continue
except EOFError:
break
try:
print(pp(analyzes(line, source_name=f"repl@{line_no}")))
except Exception as e:
print(e)
finally:
line_no += 1

View file

@ -0,0 +1,9 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -1,356 +0,0 @@
"""
The parser just parses and tokenizes.
The [syntax] analyzer interprets a parse sequence into a syntax tree which can be checked, type inferred and compiled.
"""
from abc import ABC, abstractmethod
from io import StringIO
from typing import NamedTuple, List, Union, Any, IO, Tuple
from enum import Enum
import flowmetal.parser as p
### Types
## We are not, in fact, sponsored by Typelevel LLC.
class TypeLevelExpr(object):
"""A base class for type-level expressions."""
pass
class GenericExpr(TypeLevelExpr, NamedTuple):
"""'invocation' (application) of a generic type to Type[Level]Exprs."""
pass
class TypeExpr(TypeLevelExpr, NamedTuple):
"""A bound (or yet to be bound) type level symbol."""
pass
class BuiltinType(TypeLevelExpr, Enum):
"""Built in types for atoms."""
BOOLEAN = 'Boolean'
SYMBOL = 'Symbol'
KEYWORD = 'Keyword'
STRING = 'String'
INTEGER = 'Integer'
FRACTION = 'Fraction'
FLOAT = 'Float'
class ConstraintExpr(TypeLevelExpr, NamedTuple):
"""A value-level constraint (predicate) as a type."""
## Terms
# Now down to reality
class ValueLevelExpr(object):
"""A base class for value-level expressions."""
class TriviallyTypedExpr(ValueLevelExpr):
"""And some of those expressions have trivial types."""
@property
def type(self) -> TypeExpr:
"""The type of an expression."""
class AscribeExpr(TriviallyTypedExpr, NamedTuple):
value: ValueLevelExpr
type: TypeLevelExpr
class ConstExpr(TriviallyTypedExpr, NamedTuple):
"""Constant expressions. Keywords, strings, numbers, that sort of thing."""
token: p.ConstTokenBase
@property
def data(self) -> Any:
"""The value of the constant."""
# The parser gives us this data
return self.token.data
@abstractmethod
def type(self):
raise NotImplementedError()
class BooleanExpr(ConstExpr):
@property
def type(self):
return BuiltinType.BOOLEAN
class IntegerExpr(ConstExpr):
@property
def type(self):
return BuiltinType.INTEGER
class FractionExpr(ConstExpr):
@property
def type(self):
return BuiltinType.FRACTION
class FloatExpr(ConstExpr):
@property
def type(self):
return BuiltinType.FLOAT
class KeywordExpr(ConstExpr):
@property
def type(self):
return BuiltinType.KEYWORD
class StringExpr(ConstExpr):
@property
def type(self):
return BuiltinType.STRING
class ListExpr(ValueLevelExpr, NamedTuple):
elements: List[ValueLevelExpr]
## 'real' AST nodes
class DoExpr(ValueLevelExpr, NamedTuple):
effect_exprs: List[ValueLevelExpr]
ret_expr: ValueLevelExpr
class LetExpr(ValueLevelExpr, NamedTuple):
binding_exprs: List[Tuple]
ret_expr: DoExpr
class FnExpr(ValueLevelExpr, NamedTuple):
arguments: List
ret_type: TypeExpr
ret_expr: DoExpr
## Reader implementation
class AnalyzerBase(ABC):
"""Analyzer interface."""
@classmethod
@abstractmethod
def analyze(cls, token: p.TokenBase) -> ValueLevelExpr:
"""Analyze a token tree, returning an expr tree."""
def _t(txt):
return p.SymbolToken(txt, txt, None)
class Analyzer(AnalyzerBase):
"""A reference Analyzer implementation.
Walks a parsed token tree, building up a syntax tree.
"""
TACK0 = _t('')
TACK1 = _t('|-')
TACK2 = p.KeywordToken(":-", None, None)
LET = _t('let')
DO = _t('do')
FN = _t('fn')
LIST = _t('list')
QUOTE = _t('quote')
@classmethod
def _tackp(cls, t):
return t in [cls.TACK0, cls.TACK1, cls.TACK2]
@classmethod
def _nows(cls, tokens):
return [t for t in tokens if not isinstance(t, p.WhitespaceToken)]
@classmethod
def _chomp(cls, tokens):
"""'chomp' an expression and optional ascription off the tokens, returning an expression and the remaining tokens."""
if len(tokens) == 1:
return cls.analyze(tokens[0]), []
elif cls._tackp(tokens[1]):
if len(tokens) >= 3:
return (
AscribeExpr(
cls.analyze(tokens[0]),
cls.analyze(tokens[2])),
tokens[3:],
)
else:
raise SyntaxError(f"Analyzing tack at {tokens[1].pos}, did not find following type ascription!")
else:
return cls.analyze(tokens[0]), tokens[1::]
@classmethod
def _terms(cls, tokens):
terms = []
tokens = cls._nows(tokens)
while tokens:
term, tokens = cls._chomp(tokens)
terms.append(term)
return terms
@classmethod
def analyze(cls, token: p.TokenBase):
if isinstance(token, p.BooleanToken):
return BooleanExpr(token)
if isinstance(token, p.KeywordToken):
return KeywordExpr(token)
if isinstance(token, p.IntegerToken):
return IntegerExpr(token)
if isinstance(token, p.FractionToken):
return FractionExpr(token)
if isinstance(token, p.FloatToken):
return FloatExpr(token)
if isinstance(token, p.StringToken):
return StringExpr(token)
if isinstance(token, p.SymbolToken):
return token
if isinstance(token, p.ListToken):
return cls.analyze_list(token)
@classmethod
def _do(cls, t, body: list):
return p.ListToken([cls.DO] + body, t.raw, t.pos)
@classmethod
def analyze_list(cls, token: p.ListToken):
"""Analyze a list, for which there are several 'ground' forms."""
# Expunge any whitespace tokens
tokens = cls._nows(token.data)
if len(tokens) == 0:
return ListExpr([])
if tokens[0] == cls.QUOTE:
raise NotImplementedError("Quote isn't quite there!")
if tokens[0] == cls.LIST:
return ListExpr(cls._terms(tokens[1:]))
if tokens[0] == cls.DO:
return cls.analyze_do(token)
if tokens[0] == cls.LET:
return cls.analyze_let(token)
if tokens[0] == cls.FN:
return cls.analyze_fn(token)
cls.analyze_invoke(tokens)
@classmethod
def analyze_let(cls, let_token):
tokens = cls._nows(let_token.data[1:])
assert len(tokens) >= 2
assert isinstance(tokens[0], p.ListToken)
bindings = []
binding_tokens = cls._nows(tokens[0].data)
tokens = tokens[1:]
while binding_tokens:
if isinstance(binding_tokens[0], p.SymbolToken):
bindexpr = binding_tokens[0]
binding_tokens = binding_tokens[1:]
else:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got illegal binding expression {binding_tokens[0]}")
if not binding_tokens:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got binding expression without subsequent value expression!")
if cls._tackp(binding_tokens[0]):
if len(binding_tokens) < 2:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got `⊢` at {binding_tokens[0].pos} without type!")
bind_ascription = cls.analyze(binding_tokens[1])
binding_tokens = binding_tokens[2:]
bindexpr = AscribeExpr(bindexpr, bind_ascription)
if not binding_tokens:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got binding expression without subsequent value expression!")
valexpr = binding_tokens[0]
binding_tokens = cls.analyze(binding_tokens[1:])
bindings.append((bindexpr, valexpr))
# FIXME (arrdem 2020-07-18):
# This needs to happen with bindings
tail = tokens[0] if len(tokens) == 1 else cls._do(let_token, tokens)
return LetExpr(bindings, cls.analyze(tail))
@classmethod
def analyze_do(cls, do_token):
tokens = cls._nows(do_token.data[1:])
exprs = cls._terms(tokens)
if exprs[:-1]:
return DoExpr(exprs[:-1], exprs[-1])
else:
return exprs[-1]
@classmethod
def analyze_fn(cls, fn_token):
tokens = cls._nows(fn_token.data[1:])
assert len(tokens) >= 2
assert isinstance(tokens[0], p.ListToken)
args = []
arg_tokens = cls._nows(tokens[0].data)
while arg_tokens:
argexpr, arg_tokens = cls._chomp(arg_tokens)
args.append(argexpr)
ascription = None
if cls._tackp(tokens[1]):
ascription = cls.analyze(tokens[2])
tokens = tokens[2:]
else:
tokens = tokens[1:]
# FIXME (arrdem 2020-07-18):
# This needs to happen with bindings
body = cls.analyze(cls._do(fn_token, tokens))
return FnExpr(args, ascription, body)
## Analysis interface
def analyzes(buff: str,
analyzer: AnalyzerBase = Analyzer,
parser: p.SexpParser = p.Parser,
source_name = None):
"""Parse a single s-expression from a string, returning its token tree."""
return analyze(StringIO(buff), analyzer, parser, source_name or f"<string {id(buff):x}>")
def analyzef(path: str,
analyzer: AnalyzerBase = Analyzer,
parser: p.SexpParser = p.Parser):
"""Parse a single s-expression from the file named by a string, returning its token tree."""
with open(path, "r") as f:
return analyze(f, analyzer, parser, path)
def analyze(file: IO,
analyzer: AnalyzerBase = Analyzer,
parser: p.SexpParser = p.Parser,
source_name = None):
"""Parse a single sexpression from a file-like object, returning its token tree."""
return analyzer.analyze(p.parse(file, parser, source_name))

View file

@ -1,161 +0,0 @@
"""
Tests covering the Flowmetal parser.
"""
from math import nan
import flowmetal.parser as p
import pytest
def test_parse_list():
"""Trivial parsing a list."""
assert isinstance(p.parses("()"), p.ListToken)
assert p.parses("()").paren == p.ListType.ROUND
@pytest.mark.parametrize('txt, val', [
('1', 1),
('2', 2),
('103', 103),
('504', 504),
# Sign prefixes
('-1', -1),
('+1', +1),
# Underscores as whitespace
('1_000_000', 1e6),
('+1_000', 1000),
('-1_000', -1000),
# Variable base
('2r1', 1),
('2r10', 2),
('2r100', 4),
('2r101', 5),
('+2r10', 2),
('-2r10', -2),
# Octal
('00', 0),
('01', 1),
('010', 8),
('+010', 8),
('-010', -8),
# Hex
('0x0', 0),
('0xF', 15),
('0x10', 16),
('+0x10', 16),
('-0x10', -16),
])
def test_parse_num(txt, val):
"""Some trivial cases of parsing numbers."""
assert isinstance(p.parses(txt), p.IntegerToken)
assert p.parses(txt).data == val
@pytest.mark.parametrize('frac', [
'1/2', '1/4', '1/512',
])
def test_parse_ratio(frac):
"""Test covering the ratio notation."""
assert isinstance(p.parses(frac), p.FractionToken)
assert p.parses(frac).data == p.Fraction(frac)
@pytest.mark.parametrize('sym,', [
'a',
'b',
'*earmuff-style*',
'+kebab-style+',
'JAVA_CONSTANT_STYLE',
])
def test_parse_sym(sym):
"""Some trivial cases of parsing symbols."""
assert isinstance(p.parses(sym), p.SymbolToken)
assert p.parses(sym).data == sym
@pytest.mark.parametrize('txt, tokenization', [
('(1 2 3)',
[(p.IntegerToken, '1'),
(p.WhitespaceToken, ' '),
(p.IntegerToken, '2'),
(p.WhitespaceToken, ' '),
(p.IntegerToken, '3')]),
('(a 1 b 2)',
[(p.SymbolToken, 'a'),
(p.WhitespaceToken, ' '),
(p.IntegerToken, '1'),
(p.WhitespaceToken, ' '),
(p.SymbolToken, 'b'),
(p.WhitespaceToken, ' '),
(p.IntegerToken, '2')])
])
def test_list_contents(txt, tokenization):
"""Parse examples of list contents."""
assert isinstance(p.parses(txt), p.ListToken)
lelems = p.parses(txt).data
for (type, text), token in zip(tokenization, lelems):
assert isinstance(token, type)
assert token.raw == text
@pytest.mark.parametrize('txt, value', [
('1.0', 1.0),
('-1.0', -1.0),
('1.01', 1.01),
('1e0', 1e0),
('1e3', 1e3),
('1e-3', 1e-3),
('1.01e3', 1.01e3),
('1_000e0', 1e3),
])
def test_float_values(txt, value):
"""Some examples of floats."""
assert isinstance(p.parses(txt), p.FloatToken)
assert p.parses(txt).data == value
@pytest.mark.parametrize('txt, tokenization', [
('+1', p.IntegerToken),
('+1+', p.SymbolToken),
('+1e', p.SymbolToken),
('+1e3', p.FloatToken),
('+1.0', p.FloatToken),
('+1.0e3', p.FloatToken),
('a.b', p.SymbolToken),
('1.b', p.SymbolToken),
])
def test_ambiguous_floats(txt, tokenization):
"""Parse examples of 'difficult' floats and symbols."""
assert isinstance(p.parses(txt), tokenization), "Token type didn't match!"
assert p.parses(txt).raw == txt, "Parse wasn't total!"
@pytest.mark.parametrize('txt,', [
r'""',
r'"foo"',
r'"foo bar baz qux"',
r'"foo\nbar\tbaz\lqux"',
r'''"foo
bar
baz
qux"''',
r'"\000 \x00"',
r'"\"\""',
])
def test_string(txt):
"""Some examples of strings, and of escape sequences."""
assert isinstance(p.parses(txt), p.StringToken)
@pytest.mark.parametrize('txt,', [
':foo',
':foo/bar',
':foo.bar/baz?',
])
def test_keyword(txt):
"""Some examples of keywords."""
assert isinstance(p.parses(txt), p.KeywordToken)

View file

@ -1,50 +0,0 @@
"""
Tests covering the Flowmetal analyzer.
"""
import flowmetal.parser as p
import flowmetal.syntax_analyzer as a
import pytest
@pytest.mark.parametrize('txt, exprtype', [
# Booleans
('true', a.ConstExpr),
('false', a.BooleanExpr),
# Integers
('1', a.ConstExpr),
('1', a.IntegerExpr),
# Fractions
('1/2', a.ConstExpr),
('1/2', a.FractionExpr),
# Floats
('1.0', a.ConstExpr),
('1.0', a.FloatExpr),
# Keywords
(':foo', a.ConstExpr),
(':foo', a.KeywordExpr),
# Strings
('"foo"', a.ConstExpr),
('"foo"', a.StringExpr),
])
def test_analyze_constants(txt, exprtype):
"""Make sure the analyzer can chew on constants."""
assert isinstance(a.analyzes(txt), exprtype)
@pytest.mark.parametrize('txt', [
'()',
'(list)',
'(list 1)',
'(do 1)',
'(do foo bar 1)',
'(let [a 1, b 2] 1)',
'(fn [] 1)',
'(fn [] ⊢ integer? x)',
'(fn [] x |- integer?)',
'(fn [] x :- integer?)',
])
def test_analyze(txt):
"""Make sure that do exprs work."""
assert a.analyzes(txt)