This commit is contained in:
Reid 'arrdem' McKenzie 2021-05-08 23:31:19 -06:00
parent 5dabfb80fb
commit 4e8ac14536
15 changed files with 125 additions and 1030 deletions

View file

@ -2,9 +2,20 @@ package(default_visibility = ["//visibility:public"])
py_library( py_library(
name = "lib", name = "lib",
srcs = glob(["src/python/**/*.py"]), srcs = [
"src/python/**/*.py"
],
imports = ["src/python"], imports = ["src/python"],
deps = [ deps = [
py_requirement("prompt-toolkit"),
] ]
) )
py_binary(
name = "server",
deps = [
":lib",
py_requirement("click"),
py_requirement("redis"),
],
main = "src/python/flowmetal/server/__main__.py",
)

View file

@ -225,13 +225,15 @@ In another language like Javascript or LUA, you could probably get this down to
-- the retrying behavior as specified. -- the retrying behavior as specified.
client = Client("http://service.local", api_key="...") client = Client("http://service.local", api_key="...")
retry_config = {} -- Fake, obviously
with_retry = retry(retry_config)
job = retry()( job = with_retry(
funtion () funtion ()
return client.start_plan(...) return client.start_plan(...)
end)() end)()
result = retry()( result = with_retry(
function() function()
if job.complete() then if job.complete() then
return job.get() return job.get()
@ -243,6 +245,27 @@ The insight here is that the "callback" function we're defining in the Python ex
In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader. In fact choosing the arbitrary names `r_get_job` and `r_create_job` puts more load on the programmer and the reader.
Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes. Python's lack of block anonymous procedures precludes us from cramming the `if complete then get` operation or anything more complex into a `lambda` without some serious syntax crimes.
Using [PEP-0342](https://www.python.org/dev/peps/pep-0342/#new-generator-method-send-value), it's possible to implement arbitrary coroutines in Python by `.send()`ing values to generators which may treat `yield` statements as rvalues for receiving remotely sent inputs.
This makes it possible to explicitly yield control to a remote interpreter, which will return or resume the couroutine with a result value.
Microsoft's [Durable Functions](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=python) use exactly this behavor to implement durable functions.
The "functions" provided by the API return sentinels which can be yielded to an external interpreter, which triggers processing and returns control when there are results.
This is [interpreter effect conversion pattern (Extensible Effects)](http://okmij.org/ftp/Haskell/extensible/exteff.pdf) as seen in Haskell and other tools; applied.
``` python
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
```
### Durability challenges ### Durability challenges

View file

@ -1 +0,0 @@
#!/usr/bin/env python3

View file

@ -0,0 +1,22 @@
"""
The Flowmetal server entry point.
"""
from flowmetal import frontend, interpreter, scheduler, reaper
import click
@click.group()
def cli():
pass
cli.add_command(frontend.cli, name="frontend")
cli.add_command(interpreter.cli, name="interpreter")
cli.add_command(scheduler.cli, name="scheduler")
cli.add_command(reaper.cli, name="reaper")
if __name__ == "__main__":
cli()

View file

@ -0,0 +1,25 @@
"""
An abstract or base Flowmetal DB.
"""
from abc import abc, abstractmethod, abstractproperty, abstractclassmethod, abstractstaticmethod
class Db(ABC):
"""An abstract Flowmetal DB."""
@abstractclassmethod
def connect(cls, config):
"""Build and return a connected DB."""
@abstractmethod
def disconnect(self):
"""Disconnect from the underlying DB."""
def close(self):
"""An alias for disconnect allowing for it to quack as a closable."""
self.disconnect()
@abstractmethod
def reconnect(self):
"""Attempt to reconnect; either after an error or disconnecting."""

View file

@ -0,0 +1,3 @@
"""
An implementation of the Flowmetal DB backed by Redis.
"""

View file

@ -0,0 +1,8 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -0,0 +1,8 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -0,0 +1,5 @@
"""
Somewhat generic models of Flowmetal programs.
"""
from typing import NamedTuple

View file

@ -1,80 +0,0 @@
"""The module analyzer chews modules using bindings.
Using the parser and syntax analyzer, this module chews on analyzed syntax trees doing the heavy lifting of working with
modules, namespaces and bindings. Gotta sort out all those symbols somewhere.
"""
from io import StringIO
from typing import IO, NamedTuple, Mapping
from abc import ABC, abstractmethod, abstractproperty
import flowmetal.parser as p
import flowmetal.syntax_analyzer as sa
class Namespace(NamedTuple):
## Syntax analysis implementation
class AnalyzerBase(ABC):
"""Analyzer interface."""
@classmethod
@abstractmethod
def analyze(cls, token: sa.ValueLevelExpr):
"""Analyze an expr tree, returning a binding tree."""
class Analyzer(AnalyzerBase):
@classmethod
def analyze(cls,
token: sa.ValueLevelExpr,
environment = None):
pass
## Analysis interface
def analyzes(buff: str,
module_analyzer: AnalyzerBase = Analyzer,
module_environment = None,
syntax_analyzer: sa.AnalyzerBase = sa.Analyzer,
parser: p.SexpParser = p.Parser,
source_name = None):
"""Parse a single s-expression from a string, returning its token tree."""
return analyze(StringIO(buff),
module_analyzer,
module_environment,
syntax_analyzer,
parser,
source_name or f"<string {id(buff):x}>")
def analyzef(path: str,
module_analyzer: AnalyzerBase = Analyzer,
module_environment = None,
syntax_analyzer: sa.AnalyzerBase = sa.Analyzer,
parser: p.SexpParser = p.Parser):
"""Parse a single s-expression from the file named by a string, returning its token tree."""
with open(path, "r") as f:
return analyze(f,
module_analyzer,
module_environment,
syntax_analyzer,
parser,
path)
def analyze(file: IO,
module_analyzer: AnalyzerBase = Analyzer,
module_environment = None,
syntax_analyzer: sa.AnalyzerBase = sa.Analyzer,
parser: p.SexpParser = p.Parser,
source_name = None):
"""Parse a single sexpression from a file-like object, returning its token tree."""
return module_analyzer.analyze(
syntax_analyzer.analyze(
p.parse(file, parser, source_name)),
module_environment)

View file

@ -1,511 +0,0 @@
"""
A parser for s-expressions.
"""
from abc import ABC, abstractmethod
from enum import Enum
from io import StringIO, BufferedReader
from typing import IO, NamedTuple, Any
from fractions import Fraction
import re
## Types
class Position(NamedTuple):
"""An encoding for the location of a read token within a source."""
source: str
line: int
col: int
offset: int
@staticmethod
def next_pos(pos: "Position"):
return Position(pos.source, pos.line, pos.col + 1, pos.offset + 1)
@staticmethod
def next_line(pos: "Position"):
return Position(pos.source, pos.line + 1, 1, pos.offset + 1)
class TokenBase(object):
"""The shared interface to tokens."""
@property
@abstractmethod
def pos(self):
"""The position of the token within its source."""
@property
@abstractmethod
def raw(self):
"""The raw token as scanned."""
class ConstTokenBase(TokenBase, NamedTuple):
"""The shared interface for constant tokens"""
data: Any
raw: str
pos: Position
# Hash according to data
def __hash__(self):
return hash(self.data)
# And make sure it's orderable
def __eq__(self, other):
return self.data == other
def __lt__(self, other):
return self.data < other
def __gt__(self, other):
return self.data > other
class BooleanToken(ConstTokenBase):
"""A read boolean."""
class IntegerToken(ConstTokenBase):
"""A read integer, including position."""
class FractionToken(ConstTokenBase):
"""A read fraction, including position."""
class FloatToken(ConstTokenBase):
"""A read floating point number, including position."""
class SymbolToken(ConstTokenBase):
"""A read symbol, including position."""
class KeywordToken(ConstTokenBase):
"""A read keyword."""
class StringToken(ConstTokenBase):
"""A read string, including position."""
class ListType(Enum):
"""The supported types of lists."""
ROUND = ("(", ")")
SQUARE = ("[", "]")
class ListToken(NamedTuple, TokenBase):
"""A read list, including its start position and the paren type."""
data: list
raw: str
pos: Position
paren: ListType = ListType.ROUND
class SetToken(NamedTuple, TokenBase):
"""A read set, including its start position."""
data: list
raw: str
pos: Position
class MappingToken(NamedTuple, TokenBase):
"""A read mapping, including its start position."""
data: list
raw: str
pos: Position
class WhitespaceToken(NamedTuple, TokenBase):
"""A bunch of whitespace with no semantic value."""
data: str
raw: str
pos: Position
class CommentToken(WhitespaceToken):
"""A read comment with no semantic value."""
## Parser implementation
class PosTrackingBufferedReader(object):
"""A slight riff on BufferedReader which only allows for reads and peeks of a
char, and tracks positions.
Perfect for implementing LL(1) parsers.
"""
def __init__(self, f: IO, source_name=None):
self._next_pos = self._pos = Position(source_name, 1, 1, 0)
self._char = None
self._f = f
def pos(self):
return self._pos
def peek(self):
if self._char is None:
self._char = self._f.read(1)
return self._char
def read(self):
# Accounting for lookahead(1)
ch = self._char or self._f.read(1)
self._char = self._f.read(1)
# Accounting for the positions
self._pos = self._next_pos
if ch == "\r" and self.peek() == "\n":
super.read(1) # Throw out a character
self._next_pos = Position.next_line(self._next_pos)
elif ch == "\n":
self._next_pos = Position.next_line(self._next_pos)
else:
self._next_pos = Position.next_pos(self._next_pos)
return ch
class ReadThroughBuffer(PosTrackingBufferedReader):
"""A duck that quacks like a PosTrackingBufferedReader."""
def __init__(self, ptcr: PosTrackingBufferedReader):
self._reader = ptcr
self._buffer = StringIO()
def pos(self):
return self._reader.pos()
def peek(self):
return self._reader.peek()
def read(self):
ch = self._reader.read()
self._buffer.write(ch)
return ch
def __str__(self):
return self._buffer.getvalue()
def __enter__(self, *args):
return self
def __exit__(self, *args):
pass
class SexpParser(ABC):
@classmethod
@abstractmethod
def parse(cls, f: PosTrackingBufferedReader) -> TokenBase:
"""Parse an s-expression, returning a parsed token tree."""
def read(cls, f: PosTrackingBufferedReader):
"""Parse to a token tree and read to values returning the resulting values."""
return cls.parse(f).read()
class Parser(SexpParser):
"""A basic parser which knows about lists, symbols and numbers.
Intended as a base class / extension point for other parsers.
"""
@classmethod
def parse(cls, f: PosTrackingBufferedReader):
if not f.peek():
raise SyntaxError(f"Got end of file ({f.pos()}) while parsing")
elif cls.ispunct(f.peek()):
if f.peek() == "(":
return cls.parse_list(f)
elif f.peek() == "[":
return cls.parse_sqlist(f)
elif f.peek() == '"':
return cls.parse_str(f)
elif f.peek() == ";":
return cls.parse_comment(f)
else:
raise SyntaxError(f"Got unexpected punctuation {f.read()!r} at {f.pos()} while parsing")
elif cls.isspace(f.peek()):
return cls.parse_whitespace(f)
else:
return cls.parse_symbol(f)
@classmethod
def isspace(cls, ch: str):
"""An extension point allowing for a more expansive concept of whitespace."""
return ch.isspace() or ch == ','
@classmethod
def ispunct(cls, ch: str):
return ch in (
'"'
';' # Semicolon
'()' # Parens
'⟮⟯' # 'flat' parens
'[]' # Square brackets
'⟦⟧' # 'white' square brackets
'{}' # Curly brackets
'⟨⟩' # Angle brackets
'《》' # Double angle brackets
'⟪⟫' # Another kind of double angle brackets
)
@classmethod
def parse_delimeted(cls, f: PosTrackingBufferedReader, openc, closec, ctor):
with ReadThroughBuffer(f) as rtb:
pos = None
for c in openc:
pos = pos or rtb.pos()
assert rtb.read() == c # Discard the leading delimeter
pos = rtb.pos()
acc = []
while f.peek() != closec:
if not f.peek():
raise SyntaxError(f"Got end of file while parsing {openc!r}...{closec!r} starting at {pos}")
try:
acc.append(cls.parse(rtb))
except SyntaxError as e:
raise SyntaxError(f"While parsing {openc!r}...{closec!r} starting at {pos},\n{e}")
assert rtb.read() == closec # Discard the trailing delimeter
return ctor(acc, str(rtb), pos)
# FIXME (arrdem 2020-07-18):
# Break this apart and make the supported lists composable features somehow?
@classmethod
def parse_list(cls, f: PosTrackingBufferedReader):
return cls.parse_delimeted(f, "(", ")", lambda *args: ListToken(*args, ListType.ROUND))
@classmethod
def parse_sqlist(cls, f: PosTrackingBufferedReader):
return cls.parse_delimeted(f, "[", "]", lambda *args: ListToken(*args, ListType.SQUARE))
# FIXME (arrdem 2020-07-18):
# Break this apart into middleware or composable features somehow?
@classmethod
def handle_symbol(cls, buff, pos):
def _sign(m, idx):
if m.group(idx) == '-':
return -1
else:
return 1
# Parsing integers with bases
if m := re.fullmatch(r"([+-]?)(\d+)r([a-z0-9_]+)", buff):
return IntegerToken(
_sign(m, 1) * int(m.group(3).replace("_", ""),
int(m.group(2))),
buff,
pos,
)
# Parsing hex numbers
if m := re.fullmatch(r"([+-]?)0[xX]([A-Fa-f0-9_]*)", buff):
val = m.group(2).replace("_", "")
return IntegerToken(_sign(m, 1) * int(val, 16), buff, pos)
# Parsing octal numbers
if m := re.fullmatch(r"([+-]?)0([\d_]*)", buff):
val = m.group(2).replace("_", "")
return IntegerToken(_sign(m, 1) * int(val, 8), buff, pos)
# Parsing integers
if m := re.fullmatch(r"([+-]?)\d[\d_]*", buff):
return IntegerToken(int(buff.replace("_", "")), buff, pos)
# Parsing fractions
if m := re.fullmatch(r"([+-]?)(\d[\d_]*)/(\d[\d_]*)", buff):
return FractionToken(
Fraction(
int(m.group(2).replace("_", "")),
int(m.group(3).replace("_", ""))),
buff,
pos,
)
# Parsing floats
if re.fullmatch(r"([+-]?)\d[\d_]*(\.\d[\d_]*)?(e[+-]?\d[\d_]*)?", buff):
return FloatToken(float(buff), buff, pos)
# Booleans
if buff == "true":
return BooleanToken(True, buff, pos)
if buff == "false":
return BooleanToken(False, buff, pos)
# Keywords
if buff.startswith(":"):
return KeywordToken(buff, buff, pos)
# Default behavior
return SymbolToken(buff, buff, pos)
@classmethod
def parse_symbol(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
pos = None
while rtb.peek() and not cls.isspace(rtb.peek()) and not cls.ispunct(rtb.peek()):
pos = pos or rtb.pos()
rtb.read()
buff = str(rtb)
return cls.handle_symbol(buff, pos)
@classmethod
def parse_whitespace(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
pos = None
while rtb.peek() and cls.isspace(rtb.peek()):
pos = pos or rtb.pos()
ch = rtb.read()
if ch == "\n":
break
buff = str(rtb)
return WhitespaceToken(buff, buff, pos)
@classmethod
def parse_comment(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
pos = None
while rtb.read() not in ["\n", ""]:
pos = pos or rtb.pos()
continue
buff = str(rtb)
return CommentToken(buff, buff, pos)
@classmethod
def handle_escape(cls, ch: str):
if ch == 'n':
return "\n"
elif ch == 'r':
return "\r"
elif ch == 'l':
return "\014" # form feed
elif ch == 't':
return "\t"
elif ch == '"':
return '"'
@classmethod
def parse_str(cls, f: PosTrackingBufferedReader):
with ReadThroughBuffer(f) as rtb:
assert rtb.read() == '"'
pos = rtb.pos()
content = []
while True:
if not rtb.peek():
raise
# Handle end of string
elif rtb.peek() == '"':
rtb.read()
break
# Handle escape sequences
elif rtb.peek() == '\\':
rtb.read() # Discard the escape leader
# Octal escape
if rtb.peek() == '0':
rtb.read()
buff = []
while rtb.peek() in '01234567':
buff.append(rtb.read())
content.append(chr(int(''.join(buff), 8)))
# Hex escape
elif rtb.peek() == 'x':
rtb.read() # Discard the escape leader
buff = []
while rtb.peek() in '0123456789abcdefABCDEF':
buff.append(rtb.read())
content.append(chr(int(''.join(buff), 16)))
else:
content.append(cls.handle_escape(rtb.read()))
else:
content.append(rtb.read())
buff = str(rtb)
return StringToken(content, buff, pos)
## Parsing interface
def parses(buff: str,
parser: SexpParser = Parser,
source_name=None):
"""Parse a single s-expression from a string, returning its token tree."""
return parse(StringIO(buff), parser, source_name or f"<string {id(buff):x}>")
def parsef(path: str,
parser: SexpParser = Parser):
"""Parse a single s-expression from the file named by a string, returning its token tree."""
with open(path, "r") as f:
return parse(f, parser, path)
def parse(file: IO,
parser: SexpParser = Parser,
source_name=None):
"""Parse a single sexpression from a file-like object, returning its token tree."""
return parser.parse(
PosTrackingBufferedReader(
file,
source_name=source_name
)
)
## Loading interface
def loads(buff: str,
parser: SexpParser = Parser,
source_name=None):
"""Load a single s-expression from a string, returning its object representation."""
return load(StringIO(buff), parser, source_name or f"<string {id(buff):x}>")
def loadf(path: str,
parser: SexpParser = Parser):
"""Load a single s-expression from the file named by a string, returning its object representation."""
with open(path, "r") as f:
return load(f, parser, path)
def load(file: IO,
parser: SexpParser = Parser,
source_name=None):
"""Load a single sexpression from a file-like object, returning its object representation."""
return parser.load(
PosTrackingBufferedReader(
file,
source_name=source_name
)
)
## Dumping interface
def dump(file: IO, obj):
"""Given an object, dump its s-expression coding to the given file-like object."""
raise NotImplementedError()
def dumps(obj):
"""Given an object, dump its s-expression coding to a string and return that string."""
with StringIO("") as f:
dump(f, obj)
return str(f)

View file

@ -0,0 +1,8 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -1,78 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
from flowmetal.syntax_analyzer import analyzes
from prompt_toolkit import print_formatted_text, prompt, PromptSession
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.history import FileHistory
from prompt_toolkit.styles import Style
STYLE = Style.from_dict({
# User input (default text).
"": "",
"prompt": "ansigreen",
"time": "ansiyellow"
})
class InterpreterInterrupt(Exception):
"""An exception used to break the prompt or evaluation."""
def pp(t, indent=""):
if isinstance(t, list): # lists
buff = ["["]
for e in t:
buff.append(f"{indent} " + pp(e, indent+" ")+",")
return "\n".join(buff + [f"{indent}]"])
elif hasattr(t, '_fields'): # namedtuples
buff = [f"{type(t).__name__}("]
for field, value in zip(t._fields, t):
buff.append(f"{indent} {field}=" + pp(value, indent+" ")+",")
return "\n".join(buff + [f"{indent})"])
elif isinstance(t, tuple): # tuples
buff = ["("]
for e in t:
buff.append(f"{indent} " + pp(e, indent+" ")+",")
return "\n".join(buff + [f"{indent})"])
else:
return repr(t)
parser = argparse.ArgumentParser()
def main():
"""REPL entry point."""
args = parser.parse_args(sys.argv[1:])
logger = logging.getLogger("flowmetal")
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
session = PromptSession(history=FileHistory(".iflow.history"))
line_no = 0
while True:
try:
line = session.prompt([("class:prompt", ">>> ")], style=STYLE)
except (InterpreterInterrupt, KeyboardInterrupt):
continue
except EOFError:
break
try:
print(pp(analyzes(line, source_name=f"repl@{line_no}")))
except Exception as e:
print(e)
finally:
line_no += 1

View file

@ -0,0 +1,8 @@
"""
"""
import click
@click.group()
def cli():
pass

View file

@ -1,356 +0,0 @@
"""
The parser just parses and tokenizes.
The [syntax] syntax_analyzer interprets a parse sequence into a syntax tree which can be checked, type inferred and compiled.
"""
from abc import ABC, abstractmethod
from io import StringIO
from typing import NamedTuple, List, Union, Any, IO, Tuple
from enum import Enum
import flowmetal.parser as p
### Types
## We are not, in fact, sponsored by Typelevel LLC.
class TypeLevelExpr(object):
"""A base class for type-level expressions."""
pass
class GenericExpr(TypeLevelExpr, NamedTuple):
"""'invocation' (application) of a generic type to Type[Level]Exprs."""
pass
class TypeExpr(TypeLevelExpr, NamedTuple):
"""A bound (or yet to be bound) type level symbol."""
pass
class BuiltinType(TypeLevelExpr, Enum):
"""Built in types for atoms."""
BOOLEAN = 'Boolean'
SYMBOL = 'Symbol'
KEYWORD = 'Keyword'
STRING = 'String'
INTEGER = 'Integer'
FRACTION = 'Fraction'
FLOAT = 'Float'
class ConstraintExpr(TypeLevelExpr, NamedTuple):
"""A value-level constraint (predicate) as a type."""
## Terms
# Now down to reality
class ValueLevelExpr(object):
"""A base class for value-level expressions."""
class TriviallyTypedExpr(ValueLevelExpr):
"""And some of those expressions have trivial types."""
@property
def type(self) -> TypeExpr:
"""The type of an expression."""
class AscribeExpr(TriviallyTypedExpr, NamedTuple):
value: ValueLevelExpr
type: TypeLevelExpr
class ConstExpr(TriviallyTypedExpr, NamedTuple):
"""Constant expressions. Keywords, strings, numbers, that sort of thing."""
token: p.ConstTokenBase
@property
def data(self) -> Any:
"""The value of the constant."""
# The parser gives us this data
return self.token.data
@abstractmethod
def type(self):
raise NotImplementedError()
class BooleanExpr(ConstExpr):
@property
def type(self):
return BuiltinType.BOOLEAN
class IntegerExpr(ConstExpr):
@property
def type(self):
return BuiltinType.INTEGER
class FractionExpr(ConstExpr):
@property
def type(self):
return BuiltinType.FRACTION
class FloatExpr(ConstExpr):
@property
def type(self):
return BuiltinType.FLOAT
class KeywordExpr(ConstExpr):
@property
def type(self):
return BuiltinType.KEYWORD
class StringExpr(ConstExpr):
@property
def type(self):
return BuiltinType.STRING
class ListExpr(ValueLevelExpr, NamedTuple):
elements: List[ValueLevelExpr]
## 'real' AST nodes
class DoExpr(ValueLevelExpr, NamedTuple):
effect_exprs: List[ValueLevelExpr]
ret_expr: ValueLevelExpr
class LetExpr(ValueLevelExpr, NamedTuple):
binding_exprs: List[Tuple]
ret_expr: DoExpr
class FnExpr(ValueLevelExpr, NamedTuple):
arguments: List
ret_type: TypeExpr
ret_expr: DoExpr
## Syntax analysis implementation
class AnalyzerBase(ABC):
"""Analyzer interface."""
@classmethod
@abstractmethod
def analyze(cls, token: p.TokenBase) -> ValueLevelExpr:
"""Analyze a token tree, returning an expr tree."""
def _t(txt):
return p.SymbolToken(txt, txt, None)
class Analyzer(AnalyzerBase):
"""A reference Analyzer implementation.
Walks a parsed token tree, building up a syntax tree.
"""
TACK0 = _t('')
TACK1 = _t('|-')
TACK2 = p.KeywordToken(":-", None, None)
LET = _t('let')
DO = _t('do')
FN = _t('fn')
LIST = _t('list')
QUOTE = _t('quote')
@classmethod
def _tackp(cls, t):
return t in [cls.TACK0, cls.TACK1, cls.TACK2]
@classmethod
def _nows(cls, tokens):
return [t for t in tokens if not isinstance(t, p.WhitespaceToken)]
@classmethod
def _chomp(cls, tokens):
"""'chomp' an expression and optional ascription off the tokens, returning an expression and the remaining tokens."""
if len(tokens) == 1:
return cls.analyze(tokens[0]), []
elif cls._tackp(tokens[1]):
if len(tokens) >= 3:
return (
AscribeExpr(
cls.analyze(tokens[0]),
cls.analyze(tokens[2])),
tokens[3:],
)
else:
raise SyntaxError(f"Analyzing tack at {tokens[1].pos}, did not find following type ascription!")
else:
return cls.analyze(tokens[0]), tokens[1::]
@classmethod
def _terms(cls, tokens):
terms = []
tokens = cls._nows(tokens)
while tokens:
term, tokens = cls._chomp(tokens)
terms.append(term)
return terms
@classmethod
def analyze(cls, token: p.TokenBase):
if isinstance(token, p.BooleanToken):
return BooleanExpr(token)
if isinstance(token, p.KeywordToken):
return KeywordExpr(token)
if isinstance(token, p.IntegerToken):
return IntegerExpr(token)
if isinstance(token, p.FractionToken):
return FractionExpr(token)
if isinstance(token, p.FloatToken):
return FloatExpr(token)
if isinstance(token, p.StringToken):
return StringExpr(token)
if isinstance(token, p.SymbolToken):
return token
if isinstance(token, p.ListToken):
return cls.analyze_list(token)
@classmethod
def _do(cls, t, body: list):
return p.ListToken([cls.DO] + body, t.raw, t.pos)
@classmethod
def analyze_list(cls, token: p.ListToken):
"""Analyze a list, for which there are several 'ground' forms."""
# Expunge any whitespace tokens
tokens = cls._nows(token.data)
if len(tokens) == 0:
return ListExpr([])
if tokens[0] == cls.QUOTE:
raise NotImplementedError("Quote isn't quite there!")
if tokens[0] == cls.LIST:
return ListExpr(cls._terms(tokens[1:]))
if tokens[0] == cls.DO:
return cls.analyze_do(token)
if tokens[0] == cls.LET:
return cls.analyze_let(token)
if tokens[0] == cls.FN:
return cls.analyze_fn(token)
cls.analyze_invoke(tokens)
@classmethod
def analyze_let(cls, let_token):
tokens = cls._nows(let_token.data[1:])
assert len(tokens) >= 2
assert isinstance(tokens[0], p.ListToken)
bindings = []
binding_tokens = cls._nows(tokens[0].data)
tokens = tokens[1:]
while binding_tokens:
if isinstance(binding_tokens[0], p.SymbolToken):
bindexpr = binding_tokens[0]
binding_tokens = binding_tokens[1:]
else:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got illegal binding expression {binding_tokens[0]}")
if not binding_tokens:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got binding expression without subsequent value expression!")
if cls._tackp(binding_tokens[0]):
if len(binding_tokens) < 2:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got `⊢` at {binding_tokens[0].pos} without type!")
bind_ascription = cls.analyze(binding_tokens[1])
binding_tokens = binding_tokens[2:]
bindexpr = AscribeExpr(bindexpr, bind_ascription)
if not binding_tokens:
raise SyntaxError(f"Analyzing `let` at {let_token.pos}, got binding expression without subsequent value expression!")
valexpr = binding_tokens[0]
binding_tokens = cls.analyze(binding_tokens[1:])
bindings.append((bindexpr, valexpr))
# FIXME (arrdem 2020-07-18):
# This needs to happen with bindings
tail = tokens[0] if len(tokens) == 1 else cls._do(let_token, tokens)
return LetExpr(bindings, cls.analyze(tail))
@classmethod
def analyze_do(cls, do_token):
tokens = cls._nows(do_token.data[1:])
exprs = cls._terms(tokens)
if exprs[:-1]:
return DoExpr(exprs[:-1], exprs[-1])
else:
return exprs[-1]
@classmethod
def analyze_fn(cls, fn_token):
tokens = cls._nows(fn_token.data[1:])
assert len(tokens) >= 2
assert isinstance(tokens[0], p.ListToken)
args = []
arg_tokens = cls._nows(tokens[0].data)
while arg_tokens:
argexpr, arg_tokens = cls._chomp(arg_tokens)
args.append(argexpr)
ascription = None
if cls._tackp(tokens[1]):
ascription = cls.analyze(tokens[2])
tokens = tokens[2:]
else:
tokens = tokens[1:]
# FIXME (arrdem 2020-07-18):
# This needs to happen with bindings
body = cls.analyze(cls._do(fn_token, tokens))
return FnExpr(args, ascription, body)
## Analysis interface
def analyzes(buff: str,
syntax_analyzer: AnalyzerBase = Analyzer,
parser: p.SexpParser = p.Parser,
source_name = None):
"""Parse a single s-expression from a string, returning its token tree."""
return analyze(StringIO(buff), syntax_analyzer, parser, source_name or f"<string {id(buff):x}>")
def analyzef(path: str,
syntax_analyzer: AnalyzerBase = Analyzer,
parser: p.SexpParser = p.Parser):
"""Parse a single s-expression from the file named by a string, returning its token tree."""
with open(path, "r") as f:
return analyze(f, syntax_analyzer, parser, path)
def analyze(file: IO,
syntax_analyzer: AnalyzerBase = Analyzer,
parser: p.SexpParser = p.Parser,
source_name = None):
"""Parse a single sexpression from a file-like object, returning its token tree."""
return syntax_analyzer.analyze(p.parse(file, parser, source_name))