Import datalog

This commit is contained in:
Reid 'arrdem' McKenzie 2021-05-14 23:39:18 -06:00
parent b49582a5dc
commit 091e518a7e
18 changed files with 2510 additions and 0 deletions

15
projects/datalog/BUILD Normal file
View file

@ -0,0 +1,15 @@
py_library(
name = "datalog",
srcs = glob(["src/python/**/*.py"]),
imports = [
"src/python",
]
)
py_pytest(
name = "test_datalog",
srcs = glob(["test/python/**/*.py"]),
deps = [
":datalog",
]
)

View file

@ -0,0 +1,27 @@
# Hacking on datalog
Datalog uses the [canopy](https://github.com/jcoglan/canopy) PEG
parser generator, the grammar for which is `src/datalog.peg`.
The included `Makefile` contains appropriate rules to build a Python 3
virtual environment, install the canopy compiler locally, build the
grammar, install pytest and run the test suite. All you should have to
do is `make test`, and away it all goes.
The `datalog.parser` module is generated code emitted by canopy and
should not be edited. It will be rebuilt as needed.
The parser is tested in `test/test_datalog_parser.py` which attempts
to provide coverage for the basic cases of the grammar itself. As of
v0.0.3 (and earlier) this coverage is somewhat incomplete.
The `datalog.core` module contains pretty much everything besides the
codegen'd parser. Particularly, it contains an `Actions` class which
uses hooks in the datalog PEG grammar (noted by the `%foo` ends of
lines) to construct a database AST for every *whole file* read.
The `datalog.core` module also implements evaluation of queries
against databases. This comes in the `evaluate` function and its
mutually recursive helper `join`. `join` is an implementation detail,
whereas `evaluate` is an intentionally exposed entry point. Future
versions of datalog may hide `join`.

18
projects/datalog/Makefile Normal file
View file

@ -0,0 +1,18 @@
.PHONY: test
deploy: .dev
source .dev/bin/activate; pip install twine; rm -r dist; python setup.py sdist; twine upload dist/*;
.dev:
virtualenv --python=`which python3` .dev
source .dev/bin/activate; pip install pytest; python setup.py develop
node_modules/canopy:
npm install canopy
src/python/datalog/parser.py: node_modules/canopy src/datalog.peg
node_modules/canopy/bin/canopy --lang=python src/datalog.peg
mv src/datalog.py src/python/datalog/parser.py
test: .dev $(wildcard src/**/*) $(wildcard test/**/*)
source .dev/bin/activate; PYTHONPATH=".:src/" pytest -vv

173
projects/datalog/README.md Normal file
View file

@ -0,0 +1,173 @@
# Datalog
An implementation of Datalog in Python.
## What is Datalog?
[Datalog](https://en.wikipedia.org/wiki/Datalog) is a fully declarative language for expressing relational data and queries, typically written using a syntactic subset of Prolog.
Its most interesting feature compared to other relational languages such as SQL is that it features production rules.
Briefly, a datalog database consists of rules and tuples.
Tuples are written `a(b, "c", 126, ...).`, require no declaration eg.
of table, may be of arbitrary even varying length.
Rules are written `f(A, B) :- a(A), b(B)` and when evaluated produce tuples.
This rule for instance would define `∀ aₑ∈a, bₑ∈b f(a, b)` eg the cross-product of the elements of the tuple sets `a(A)` and `b(B)`.
## Quickstart
We're gonna make use of the [datalog.easy](#datalog.easy) API.
It's somewhat simplified and definitely has sharp edges, but has much better ergonomics than working directly with the query engine from Python.
```
# Pull in the datalog.easy package
>>> from datalog import easy
# Read some tuples into a Dataset.
#
# Because the base Dataset class has some limitations, easy gives you
# an instance of the IndexedDataset which is best supported
>>> db = read('''
... edge(a, b).
... edge(b, c).
... edge(c, d).
... edge(d, e).
... ''')
```
Now that we've got a db instance, we can run some queries over it.
The two core operations are Select and Join.
Select selects tuples - both constants and from rules.
Join selects several tuples at once by unifying logic variables.
Let's select some tuples first.
Select returns a sequence of pairs `(tuples, bindings)`, where tuples are the selected tuples (always one tuple in fact), and bindings is a mapping of LVars to bound constants.
```
>>> easy.select(db, ('edge', 'a', 'b'))
[((('edge', 'a', 'b'),), {})]
```
Cool!
But what if we wanted to find all edges from a?
This is where logic variables come in.
Logic variables are written as capitalized words in textual datalog, and the easy package recognizes capitalized strings as logic variables when processing queries.
There's only one such tuple, `edge(a, b)`, but lets see if we find it.
```
>>> easy.select(db, ('edge', 'a', 'B'))
[((('edge', 'a', 'b'),), {'B': 'b'})]
```
Nice.
But what of joins?
Rules are really a way to give a name to the result of a join, but we can do joins directly too.
For instance, we could try to select all contiguous 2-segment paths.
Unlike select which takes a single tuple, join takes a sequence of tuples to simultaneously satisfy.
However select like join returns a sequence of pairs `(tuples, bindings)`, where tuples may actually have many elements.
In this case, we're selecting pairs of adjacent edges, so we'll get two tuples and three bindings back in each result.
```
>>> easy.join(db, [
... ('edge', 'A', 'B'), # Any edge, ending at B
... ('edge', 'B', 'C') # Any edge, beginning at the same B
... ])
[((('edge', 'a', 'b'),
('edge', 'b', 'c')),
{'A': 'a', 'B': 'b', 'C': 'c'}),
((('edge', 'b', 'c'),
('edge', 'c', 'd')),
{'A': 'b', 'B': 'c', 'C': 'd'}),
((('edge', 'c', 'd'),
('edge', 'd', 'e')),
{'A': 'c', 'B': 'd', 'C': 'e'})]
```
## API
### `datalog.types`
<span id="#datalog.types" />
The types package provides the core representation used by the rest of the system.
It defines the `Constant(value)` and `LVar(name)` tuples types.
A datalog tuple `a(b, c)` is internally represented as `(Constant('a'), Constant('b'), Constant('c')')`.
Tuples containing logic variables simply contain `LVar` instances in addition to `Constant` values.
The `LTuple` type alias is for tuples which contain both constants and logic variables.
The `CTuple` type alias is for tuples containing only constants.
A `Rule(pattern, clauses)` is a pair of an `LTuple` being the pattern for the tuples produced by the rule, and a sequence of clauses being `LTuple` values representing join constraints on the result of the rule.
The types package also defines the `Dataset` class.
A `Dataset` is a container for a sequence of tuples, and a sequence of rules which define tuples.
In fact the `Dataset` class only has three methods `rules()`, `tuples()` and `merge(other)`.
The query planners work mostly in terms of `Dataset` instances, although extensions of `Dataset` may be better supported.
`CachedDataset` is an extension of the `Dataset` type which allows the query engine to cache the result(s) of evaluating rules.
This enables recursive rule evaluation, and some other optimizations.
`IndexedDataset` is an extension of `CachedDataset` which also features support for indices which can reduce the amount of data processed.
### `datalog.parser`
<span id="#datalog.parser" />
This package contains only generated code, and is used to implement the reader.
Its contents are unstable.
### `datalog.reader`
<span id="#datalog.reader" />
The reader only intentionally exposes three methods - `read` aka `read_dataset` which accepts a string and an optional kwarg `db_cls` being a class extending `Dataset` into which tuples and rules should be read.
It also exposes `read_command` which returns a pair `(op: str, val: Either[Rule, LTuple])`.
This function is used to implement parts of the REPL, packaged separately ([PyPi](https://pypi.org/package/arrdem/datalog.shell), [git](https://git.arrdem.com/arrdem/datalog-shell)).
### `datalog.evaluator`
<span id="#datalog.evaluator" />
At present, the evaluator only contains two methods - `select` and `join`.
Select and join are mutually recursive, because rule evaluation is recursively selecting the results of joins.
At present, there is only one implementation of select and join in the system.
In the future, this interface will be replaced to add support for query planners.
Users should prefer the generally stable `datalog.easy` interface to working directly with the evaluator.
### `datalog.easy`
<span id="#datalog.easy" />
A shim over the reader and evaluator designed to make interacting with the evaluator from python more convenient.
Not simpler, just more convenient.
`read(str, db_cls=IndexedDataset)` is just a shim to `datalog.reader.read` with a better default class.
`select(db: Dataset, query: LTuple)` eagerly evaluates all results instead of producing a generator, eliminating `Constant()` and `LVar()` wrappers in both tuples and bindings.
`join(db: Dataset, query: Sequence[LTuple])` likewise eagerly evaluates all results, and likewise simplifies results.
## Usage
```
$ pip install --user arrdem.datalog
```
### Limitations
Recursion may have some completeness bugs. I have not yet encountered any, but I also don't have a strong proof of correctness for the recursive evaluation of rules yet.
The current implementation of negated clauses CANNOT propagate positive information. This means that negated clauses can only be used in conjunction with positive clauses. It's not clear if this is an essential limitation.
There is as of yet no query planner - not even segmenting rules and tuples by relation to restrict evaluation. This means that the complexity of a query is `O(dataset * term count)`, which is clearly less than ideal.
## License
Mirrored from https://git.arrdem.com/arrdem/datalog-py
Published under the MIT license. See [LICENSE.md](LICENSE.md)

View file

@ -0,0 +1,25 @@
"""
For benchmarking the datalog.
Generates a large graph, which will be expensive to enumerate 2-edges over and paths through.
"""
from random import choice
from uuid import uuid4 as uuid
with open("graph.dtl", "w") as f:
nodes = []
# Generate 10k edges
for i in range(10000):
if nodes:
from_node = choice(nodes)
else:
from_node = uuid()
to_node = uuid()
nodes.append(to_node)
f.write(f"edge({str(from_node)!r}, {str(to_node)!r}).\n")

61
projects/datalog/package-lock.json generated Normal file
View file

@ -0,0 +1,61 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"abbrev": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz",
"integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q=="
},
"canopy": {
"version": "0.3.0",
"resolved": "https://registry.npmjs.org/canopy/-/canopy-0.3.0.tgz",
"integrity": "sha1-PyL0IpBgU0/hdZzitTRbxLXUXXU=",
"requires": {
"mkdirp": "^0.5.1",
"nopt": "^4.0.1"
}
},
"minimist": {
"version": "0.0.8",
"resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.8.tgz",
"integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0="
},
"mkdirp": {
"version": "0.5.1",
"resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz",
"integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=",
"requires": {
"minimist": "0.0.8"
}
},
"nopt": {
"version": "4.0.1",
"resolved": "https://registry.npmjs.org/nopt/-/nopt-4.0.1.tgz",
"integrity": "sha1-0NRoWv1UFRk8jHUFYC0NF81kR00=",
"requires": {
"abbrev": "1",
"osenv": "^0.1.4"
}
},
"os-homedir": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz",
"integrity": "sha1-/7xJiDNuDoM94MFox+8VISGqf7M="
},
"os-tmpdir": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/os-tmpdir/-/os-tmpdir-1.0.2.tgz",
"integrity": "sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ="
},
"osenv": {
"version": "0.1.5",
"resolved": "https://registry.npmjs.org/osenv/-/osenv-0.1.5.tgz",
"integrity": "sha512-0CWcCECdMVc2Rw3U5w9ZjqX6ga6ubk1xDVKxtBQPK7wis/0F2r9T6k4ydGYhecl7YUBxBVxhL5oisPsNxAPe2g==",
"requires": {
"os-homedir": "^1.0.0",
"os-tmpdir": "^1.0.0"
}
}
}
}

30
projects/datalog/setup.py Normal file
View file

@ -0,0 +1,30 @@
from setuptools import setup
setup(
name="arrdem.datalog",
# Package metadata
version="2.0.1",
license="MIT",
description="A Datalog engine",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Reid 'arrdem' McKenzie",
author_email="me@arrdem.com",
url="https://git.arrdem.com/arrdem/source",
classifiers=[
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Topic :: Database",
"Topic :: Database :: Database Engines/Servers",
"Topic :: Database :: Front-Ends",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
],
# Package setup
package_dir={"": "src/python"},
packages=["datalog",],
)

View file

@ -0,0 +1,26 @@
# A Datalog parser.
#
# The core entry point is the `dataset` rule, intended for parsing whole files.
#
# For convenience and consistency in implementing command shells `command` exists.
grammar Datalog
dataset <- (rule / comment / whitespace)* %make_dataset
rule <- clause (ws ":-" ws clauses)? "." %make_rule
clauses <- clause ("," ws clauses)? %make_clauses
clause <- negation? word "(" terms ")" %make_clause
negation <- "~"
terms <- term ("," ws terms)? %make_terms
term <- string / lvar / word
lvar <- [A-Z] [a-z0-9-_=<>]* %make_symbol
word <- [a-z0-9-_=<>]* %make_word
string <- sq_string / dq_string
sq_string <- "'" ([^']*) "'" %make_string
dq_string <- "\"" ([^\"]*) "\"" %make_string
ws <- (comment / whitespace)+
comment <- "%" ([^\n]*) "\n" %make_comment
whitespace <- [ \t\n]+ %make_ws
# And now for the helper productions
# These are NOT reached during normal grammar parsing
command <- clause (ws ":-" ws clauses)? ("." / "?" / "!") %make_command

View file

View file

@ -0,0 +1,50 @@
"""Debris."""
def shuffled(seq):
"""Because random.shuffle() is in-place >.>"""
s = seq.copy()
shuffle(s)
return s
def constexpr_p(expr):
"""Predicate. True of all terms of the expr are constants."""
return all(isinstance(e, LVar) for e in expr)
class Timing(object):
"""
A context manager object which records how long the context took.
"""
def __init__(self):
self.start = None
self.end = None
def __enter__(self):
from datetime import datetime
self.start = datetime.utcnow()
return self
def __exit__(self, type, value, traceback):
from datetime import datetime
self.end = datetime.utcnow()
def __call__(self):
"""
If the context is exited, return its duration. Otherwise return the duration "so far".
"""
from datetime import datetime
if self.start and self.end:
return self.end - self.start
else:
return datetime.utcnow() - self.start
def __str__(self):
return str(self())

View file

@ -0,0 +1,87 @@
"""
Easy datalog.
Takes the core datalog engine and wraps it up so it's a little nicer to work with.
Easy because it's closer to hand, but no simpler.
"""
from typing import Sequence, Tuple
from datalog.evaluator import join as __join, select as __select
from datalog.reader import read as __read
from datalog.types import Constant, Dataset, LTuple, LVar, PartlyIndexedDataset
def read(text: str, db_cls=PartlyIndexedDataset):
"""A helper for reading Datalog text into a well-supported dataset."""
return __read(text, db_cls=db_cls)
def q(t: Tuple[str]) -> LTuple:
"""Helper for writing terse queries.
Takes a tuple of strings, and interprets them as a logic tuple.
So you don't have to write the logic tuple out by hand.
"""
def _x(s: str):
if s[0].isupper():
return LVar(s)
else:
return Constant(s)
return tuple(_x(e) for e in t)
def __mapv(f, coll):
return [f(e) for e in coll]
def __result(results_bindings):
results, bindings = results_bindings
return (
tuple(tuple(c.value for c in e) for e in results),
{var.name: c.value for var, c in bindings.items()},
)
def select(db: Dataset, query: Tuple[str], bindings=None) -> Sequence[Tuple]:
"""Helper for interpreting tuples of strings as a query, and returning simplified results.
Executes your query, returning matching full tuples.
"""
return __mapv(__result, __select(db, q(query), bindings=bindings))
def join(db: Dataset, query: Sequence[Tuple[str]], bindings=None) -> Sequence[dict]:
"""Helper for interpreting a bunch of tuples of strings as a join query, and returning simplified
results.
Executes the query clauses as a join, returning a sequence of tuples and binding mappings such
that the join constraints are simultaneously satisfied.
>>> db = read('''
... edge(a, b).
... edge(b, c).
... edge(c, d).
... ''')
>>> join(db, [
... ('edge', 'A', 'B'),
... ('edge', 'B', 'C')
... ])
[((('edge', 'a', 'b'),
('edge', 'b', 'c')),
{'A': 'a', 'B': 'b', 'C': 'c'}),
((('edge', 'b', 'c'),
('edge', 'c', 'd')),
{'A': 'b', 'B': 'c', 'C': 'd'}),
((('edge', 'c', 'd'),
('edge', 'd', 'f')),
{'A': 'c', 'B': 'd', 'C': 'f'})]
"""
return __mapv(__result, __join(db, [q(c) for c in query], bindings=bindings))

View file

@ -0,0 +1,231 @@
"""
A datalog engine.
"""
from functools import reduce
from itertools import chain
from datalog.parser import parse
from datalog.reader import pr_str, read
from datalog.types import (
CachedDataset,
Constant,
Dataset,
LVar,
Rule,
TableIndexedDataset,
)
def match(tuple, expr, bindings=None):
"""Attempt to construct lvar bindings from expr such that tuple and expr equate.
If the match is successful, return the binding map, otherwise return None.
"""
bindings = bindings.copy() if bindings is not None else {}
for a, b in zip(expr, tuple):
# Note the lvar - lvar case is deliberately ignored.
# This may not work out long term.
if isinstance(a, LVar) and isinstance(b, LVar):
continue
elif isinstance(a, LVar) and not a in bindings and isinstance(b, Constant):
bindings[a] = b
elif isinstance(a, LVar) and a in bindings and bindings[a] == b:
continue
elif isinstance(a, LVar) and a in bindings and bindings[a] != b:
return
elif a != b:
return
return bindings
def apply_bindings(expr, bindings, strict=True):
"""Given an expr which may contain lvars, substitute its lvars for constants returning the
simplified expr.
"""
if strict:
return tuple((bindings[e] if isinstance(e, LVar) else e) for e in expr)
else:
return tuple((bindings.get(e, e) if isinstance(e, LVar) else e) for e in expr)
def select(db: Dataset, expr, bindings=None, _recursion_guard=None, _select_guard=None):
"""Evaluate an expression in a database, lazily producing a sequence of 'matching' tuples.
The dataset is a set of tuples and rules, and the expression is a single tuple containing lvars
and constants. Evaluates rules and tuples, returning
"""
def __select_tuples():
# As an opt. support indexed scans, which is optional.
if isinstance(db, TableIndexedDataset):
iter = db.scan_index(expr)
else:
iter = db.tuples()
# For all hits in the scan, check for a match
# FIXME (arrdem 2019-06-01):
# Use the WALRUS OPERATOR
for t in iter:
# Lengths must tie off
if len(expr) != len(t):
continue
# The more expensive check - terms + bindings must tie off
_bindings = match(t, expr, bindings or {})
if _bindings is not None:
yield ((t,), _bindings)
def __inner_select_rules(r, cache_key, base_bindings):
for tuples, bindings in join(
db,
r.clauses,
base_bindings,
pattern=r.pattern,
_recursion_guard={r, *_recursion_guard},
):
# And some fancy footwork so we return bindings in terms of THIS expr not the pattern(s)
t = apply_bindings(r.pattern, bindings)
if isinstance(db, CachedDataset):
db.cache_tuple(cache_key, t)
yield t
def __select_rules():
# AND now for the messy bit, we have to do rule evaluation.
# HACK (arrdem 2019-06-18):
# As a heuristic, traverse all INACTIVE rules first.
# The intuition here is that if we hit a RECURSIVE rule first, we want to hit its base case.
_inactve_rules = []
_active_rules = []
for r in db.rules():
if r in _recursion_guard:
_active_rules.append(r)
else:
_inactve_rules.append(r)
# Now prioritizing the inactive rules, try to find matches.
for r in [*_inactve_rules, *_active_rules]:
# If the patterns could match,
if r.pattern[0] == expr[0] and len(r.pattern) == len(expr):
# Establish "base" bindings from expr constants to rule lvars
base_bindings = match(expr, r.pattern)
# Note that this could fail if there are mismatched constants, in which case break.
if base_bindings is None:
continue
cache_key = (
r,
apply_bindings(r.pattern, base_bindings, strict=False),
)
if isinstance(db, CachedDataset):
results = db.scan_cache(cache_key)
else:
results = None
if results is None:
results = __inner_select_rules(r, cache_key, base_bindings)
# FIXME (arrdem 2019-06-12):
# It's possible that we hit an index or cache precisely and don't need to test.
for t in results:
p_bindings = match(t, expr)
# It's possible that we bind a tuple, and then it doesn't match.
if p_bindings is not None and t not in _select_guard:
_select_guard.add(t)
yield (
(t,),
p_bindings,
)
if _recursion_guard is None:
_recursion_guard = set()
if _select_guard is None:
_select_guard = set()
if bindings is None:
bindings = {}
# Binary equality is built-in and somewhat magical.
if expr[0] == Constant("=") and len(expr) == 3:
e = apply_bindings(expr, bindings)
if e[1] == e[2]:
yield (expr, bindings)
# Matching tuples, with or without lvars present.
else:
yield from __select_tuples()
yield from __select_rules()
def join(db: Dataset, clauses, bindings, pattern=None, _recursion_guard=None):
"""Evaluate clauses over the dataset, joining (or antijoining) with the seed bindings.
Yields a sequence of tuples and LVar bindings for which all joins and antijoins were satisfied.
"""
def __join(g, clause):
for ts, bindings in g:
for _ts, _bindings in select(
db,
apply_bindings(clause, bindings, strict=False),
bindings=bindings,
_recursion_guard=_recursion_guard,
):
_ts = (
*ts,
*_ts,
)
_bindings = {**bindings, **_bindings}
yield _ts, _bindings
def __antijoin(g, clause):
clause = clause[1]
for ts, bindings in g:
if not any(
select(
db,
apply_bindings(clause, bindings, strict=False),
_recursion_guard=_recursion_guard,
)
):
yield ts, bindings
def _join(g, clause):
if clause[0] == "not":
return __antijoin(g, clause)
else:
return __join(g, clause)
def _eval(init, bindings):
yield from select(
db, init, bindings=bindings, _recursion_guard=_recursion_guard
)
# Get the "first" clause which is a positive join - as these can be selects
# and pull all antijoins so they can be sorted to the "end" as a proxy for dependency ordering
init = None
join_clauses = []
antijoin_clauses = []
for c in clauses:
if c[0] != "not" and not init:
init = c
elif c[0] == "not":
antijoin_clauses.append(c)
else:
join_clauses.append(c)
# The iterator is the chained application of _join over all the clauses, seeded with the init gen.
for ts, bindings in reduce(
_join, join_clauses + antijoin_clauses, _eval(init, bindings)
):
yield ts, bindings

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,172 @@
"""
A datalog reader.
"""
from collections import defaultdict
from itertools import chain
from datalog.parser import FAILURE, Grammar
from datalog.types import Constant, Dataset, LVar, Rule
class Actions(object):
def __init__(self, db_cls=None):
self._db_cls = db_cls or Dataset
def make_dataset(self, input, start, end, elements):
# Group the various terms
rules = []
tuples = []
for e in elements:
if e:
if isinstance(e, Rule):
rules.append(e)
else:
tuples.append(e)
return self._db_cls(tuples, rules)
def make_symbol(self, input, start, end, elements):
return LVar("".join(e.text for e in elements),)
def make_word(self, input, start, end, elements):
return Constant("".join(e.text for e in elements),)
def make_string(self, input, start, end, elements):
return Constant(elements[1].text,)
def make_comment(self, input, start, end, elements):
return None
def make_ws(self, input, start, end, elements=None): # watf?
pass
def make_rule(self, input, start, end, elements):
if elements[1].elements:
return Rule(elements[0], elements[1].elements[3][1])
else:
return elements[0]
def make_clause(self, input, start, end, elements):
if elements[0].text == "~":
return ("not", (elements[1], *elements[3][1]))
else:
return (elements[1], *elements[3][1])
def make_terms(self, input, start, end, elements):
return self._make("terms", elements)
def make_clauses(self, input, start, end, elements):
return self._make("clauses", elements)
def _make(self, tag, elements):
if len(elements) == 1:
return (
tag,
[elements[0]],
)
elif elements[1].elements:
return (tag, [elements[0]] + elements[1].elements[2][1])
else:
return (tag, [elements[0]])
def make_command(self, input, start, end, elements):
op = elements[-1].text
val = self.make_rule(input, start, end, elements)
if op == ".":
val = self.make_dataset(input, start, end, [val])
return op, val
class Parser(Grammar):
"""Implementation detail.
A slightly hacked version of the Parser class canopy generates, which lets us control what the
parsing entry point is. This lets me play games with having one parser and one grammar which is
used both for the command shell and for other things.
"""
def __init__(self, input, actions, types):
self._input = input
self._input_size = len(input)
self._actions = actions
self._types = types
self._offset = 0
self._cache = defaultdict(dict)
self._failure = 0
self._expected = []
def parse(self, meth):
tree = meth()
if tree is not FAILURE and self._offset == self._input_size:
return tree
if not self._expected:
self._failure = self._offset
self._expected.append("<EOF>")
raise ParseError(format_error(self._input, self._failure, self._expected))
def format_error(input, offset, expected):
lines, line_no, position = input.split("\n"), 0, 0
while position <= offset:
position += len(lines[line_no]) + 1
line_no += 1
message, line = (
"Line " + str(line_no) + ": expected " + ", ".join(expected) + "\n",
lines[line_no - 1],
)
message += line + "\n"
position -= len(line) + 1
message += " " * (offset - position)
return message + "^"
def read_dataset(text: str, db_cls=None):
"""Read a string of text, returning a whole Datalog dataset."""
parser = Parser(text, Actions(db_cls=db_cls), None)
return parser.parse(parser._read_dataset)
def read_command(text: str, db_cls=None):
"""Read a string of text, returning a whole Datalog dataset."""
actions = Actions(db_cls=db_cls)
parser = Parser(text, actions, None)
return parser.parse(parser._read_command)
read = read_dataset
def pr_clause(e):
if len(e) == 2 and e[0] == "not":
return "~" + pr_str(e[1])
else:
return pr_str(e)
def pr_str(e):
if isinstance(e, str):
return e
elif isinstance(e, Rule):
return (
pr_str(e.pattern)
+ " :- "
+ ", ".join([pr_clause(c) for c in e.clauses])
+ "."
)
elif isinstance(e, Constant):
return repr(e.value)
elif isinstance(e, LVar):
return e.name
elif isinstance(e, list):
return "[{}]".format(", ".join(pr_str(_e) for _e in e))
elif isinstance(e, tuple):
return e[0].value + "(" + ", ".join(pr_str(_e) for _e in e[1:]) + ")"

View file

@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
The core IR types for datalog.
"""
from collections import namedtuple
from typing import Sequence, Tuple, Union
class Constant(namedtuple("Constant", ["value"])):
"""Representation of a constant for interpreter dispatching."""
class LVar(namedtuple("LVar", ["name"])):
"""Representation of an LVar for interpreter dispatching."""
class Rule(namedtuple("Rule", ["pattern", "clauses"])):
"""Representation of an Rule for the interpreter."""
def __new__(cls, pattern, clauses):
return super(cls, Rule).__new__(cls, pattern, tuple(clauses))
@property
def used_vars(self):
return {e for e in self.pattern if isinstance(e, LVar)}
@property
def bound_vars(self):
return {e for c in self.clauses for e in c if isinstance(e, LVar)}
@property
def free_vars(self):
return {v for v in self.used_vars if v not in self.bound_vars}
# Logical type for 'Tuple' as we're gonna use it for the rest of the thing.
LTuple = Tuple[Union[Constant, LVar]]
CTuple = Tuple[Constant]
class Dataset(object):
"""A set of tuples and rules which can be queried."""
def __init__(self, tuples: Sequence[CTuple], rules: Sequence[Rule]):
self.__tuples = tuples
self.__rules = rules
def tuples(self) -> Sequence[CTuple]:
for t in self.__tuples:
yield t
def rules(self) -> Sequence[Rule]:
for r in self.__rules:
yield r
def merge(self, other: "Dataset") -> "Dataset":
"""Merge two datasets together, returning a new one."""
return type(self)(
list({*self.tuples(), *other.tuples()}), [*self.rules(), *other.rules()]
)
class CachedDataset(Dataset):
"""An extension of the dataset which features a cache of rule produced tuples.
Note that this cache is lost when merging datasets - which ensures correctness.
"""
# Inherits tuples, rules, merge
def __init__(self, tuples, rules):
super(__class__, self).__init__(tuples, rules)
# The cache is a mapping from a Rule to tuples produced by it.
self.__cache = {}
def scan_cache(self, rule_tuple):
if rule_tuple in self.__cache:
return iter(self.__cache.get(rule_tuple))
def cache_tuple(self, rule_tuple, tuple: CTuple):
coll = self.__cache.get(rule_tuple, list())
self.__cache[rule_tuple] = coll
if tuple not in coll:
coll.append(tuple)
class TableIndexedDataset(CachedDataset):
"""An extension of the Dataset type which features both a cache and an index by table & length.
The index allows more efficient scans by maintaining 'table' style partitions.
It does not support user-defined indexing schemes.
Note that index building is delayed until an index is scanned.
"""
# From Dataset:
# tuples, rules, merge
# from CachedDataset:
# cache_tuple, scan_cache
@staticmethod
def __key(t: LTuple) -> str:
assert isinstance(t[0], Constant)
return f"{t[0].value}_{len(t)}"
def __init__(self, tuples, rules):
super(__class__, self).__init__(tuples, rules)
self.__index = {}
def __build_indices(self):
if not self.__index:
for t in self.tuples():
key = self.__key(t)
# FIXME: Walrus operator???
coll = self.__index[key] = self.__index.get(key, list())
coll.append(t)
def scan_index(self, t: LTuple) -> Sequence[CTuple]:
self.__build_indices()
for t in self.__index.get(self.__key(t), []):
yield t
class PartlyIndexedDataset(TableIndexedDataset):
"""An extension of the Dataset type which features both a cache and and a full index by table,
length, tuple index and value.
The index allows extremely efficient scans when elements of the tuple are known.
"""
# From Dataset:
# tuples, rules, merge
# from CachedDataset:
# cache_tuple, scan_cache
# from IndexedDataset / TableIndexedDataset:
# scan_index
@staticmethod
def __key(t: LTuple, i: int) -> str:
assert isinstance(t[0], Constant)
return (f"{t[0].value}_{len(t)}_{i}", t[i])
def __init__(self, tuples, rules, index_prefix=999):
super(__class__, self).__init__(tuples, rules)
self.__index_prefix = index_prefix
self.__index = {}
def __build_indices(self):
if not self.__index:
self.__index = index = {}
# Index by single value
for t in self.tuples():
for e, i in zip(t, range(self.__index_prefix)):
key = self.__key(t, i)
# FIXME: Walrus operator???
coll = index[key] = index.get(key, list())
coll.append(t)
def scan_index(self, t: LTuple) -> Sequence[CTuple]:
self.__build_indices()
default_key = self.__key(t, 0)
column_indices = []
for e, i in zip(t, range(self.__index_prefix)):
if isinstance(e, Constant):
_k = self.__key(t, i)
v = self.__index.get(_k)
if v:
column_indices.append((_k, v))
else:
# If there's no such index, then there's no such tuple. Abort.
return iter([])
if column_indices:
sorted(column_indices, key=lambda x: len(x[1]))
key, l = column_indices[-1]
else:
key = default_key
l = self.__index[key] = self.__index.get(key, list())
return iter(l)

View file

@ -0,0 +1,227 @@
"""Query evaluation unit tests."""
from datalog.easy import read, select
from datalog.types import (
CachedDataset,
Constant,
Dataset,
LVar,
PartlyIndexedDataset,
Rule,
TableIndexedDataset,
)
import pytest
DBCLS = [Dataset, CachedDataset, TableIndexedDataset, PartlyIndexedDataset]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_id_query(db_cls):
"""Querying for a constant in the dataset."""
ab = (
Constant("a"),
Constant("b"),
)
assert not select(db_cls([], []), ("a", "b",))
assert select(db_cls([ab], []), ("a", "b",)) == [((("a", "b"),), {},)]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_lvar_query(db_cls):
"""Querying for a binding in the dataset."""
d = read("""a(b). a(c).""", db_cls=db_cls)
assert select(d, ("a", "X")) == [
((("a", "b"),), {"X": "b"}),
((("a", "c"),), {"X": "c"}),
]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_lvar_unification(db_cls):
"""Querying for MATCHING bindings in the dataset."""
d = read("""edge(b, c). edge(c, c).""", db_cls=db_cls)
assert select(d, ("edge", "X", "X",)) == [((("edge", "c", "c"),), {"X": "c"})]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_rule_join(db_cls):
"""Test a basic join query - the parent -> grandparent relation."""
child = Constant("child")
gc = Constant("grandchild")
d = read(
"""
child(a, b).
child(b, c).
child(b, d).
child(b, e).
grandchild(A, B) :-
child(A, C),
child(C, B).
""",
db_cls=db_cls,
)
assert select(d, ("grandchild", "a", "X",)) == [
((("grandchild", "a", "c"),), {"X": "c"}),
((("grandchild", "a", "d"),), {"X": "d"}),
((("grandchild", "a", "e"),), {"X": "e"}),
]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_antijoin(db_cls):
"""Test a query containing an antijoin."""
d = read(
"""
a(foo, bar).
b(foo, bar).
a(baz, qux).
% matching b(baz, qux). is our antijoin test
no-b(X, Y) :-
a(X, Y),
~b(X, Z).
""",
db_cls=db_cls,
)
assert select(d, ("no-b", "X", "Y")) == [
((("no-b", "baz", "qux"),), {"X": "baz", "Y": "qux"})
]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_nested_antijoin(db_cls):
"""Test a query which negates a subquery which uses an antijoin.
Shouldn't exercise anything more than `test_antjoin` does, but it's an interesting case since you
actually can't capture the same semantics using a single query. Antijoins can't propagate positive
information (create lvar bindings) so I'm not sure you can express this another way without a
different evaluation strategy.
"""
d = read(
"""
a(foo, bar).
b(foo, bar).
a(baz, qux).
b(baz, quack).
b-not-quack(X, Y) :-
b(X, Y),
~=(Y, quack).
a-no-nonquack(X, Y) :-
a(X, Y),
~b-not-quack(X, Y).
""",
db_cls=db_cls,
)
assert select(d, ("a-no-nonquack", "X", "Y")) == [
((("a-no-nonquack", "baz", "qux"),), {"X": "baz", "Y": "qux"})
]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_alternate_rule(db_cls):
"""Testing that both recursion and alternation work."""
d = read(
"""
edge(a, b).
edge(b, c).
edge(c, d).
edge(d, e).
edge(e, f).
path(A, B) :-
edge(A, B).
path(A, B) :-
edge(A, C),
path(C, B).
""",
db_cls=db_cls,
)
# Should be able to recurse to this one.
assert select(d, ("path", "a", "f")) == [((("path", "a", "f"),), {})]
# FIXME (arrdem 2019-06-13):
#
# This test is BROKEN for the simple dataset. In order for left-recursive production rules to
# work, they have to ground out somewhere. Under the naive, cache-less datalog this is an
# infinite left recursion. Under the cached versions, the right-recursion becomes iteration over
# an incrementally realized list which ... is weird but does work because the recursion grounds
# out in iterating over an empty list on the 2nd round then falls through to the other production
# rule which generates ground tuples and feeds everything.
#
# It's not clear how to make this work with the current (lack of) query planner on the naive db as
# really fixing this requires some amount of insight into the data dependency structure and may
# involve reordering rules.
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_alternate_rule_lrec(db_cls):
"""Testing that both recursion and alternation work."""
d = read(
"""
edge(a, b).
edge(b, c).
edge(c, d).
edge(d, e).
edge(e, f).
path(A, B) :-
edge(A, B).
path(A, B) :-
path(A, C),
edge(C, B).
""",
db_cls=db_cls,
)
# Should be able to recurse to this one.
assert select(d, ("path", "a", "f")) == [((("path", "a", "f"),), {})]
@pytest.mark.parametrize("db_cls,", DBCLS)
def test_cojoin(db_cls):
"""Tests that unification occurs correctly."""
d = read(
"""
edge(a, b).
edge(b, c).
edge(c, d).
edge(d, e).
edge(e, f).
edge(c, q).
two_path(A, B, C) :- edge(A, B), edge(B, C).
""",
db_cls=db_cls,
)
# Should be able to recurse to this one.
assert [t for t, _ in select(d, ("two_path", "A", "B", "C"))] == [
(("two_path", "a", "b", "c"),),
(("two_path", "b", "c", "d"),),
(("two_path", "b", "c", "q"),),
(("two_path", "c", "d", "e"),),
(("two_path", "d", "e", "f"),),
]

View file

@ -0,0 +1,31 @@
"""
Reader tests.
"""
import pytest
from datalog.reader import read
EXS = [
"%foo\n",
"""a(b).""",
"""edge(a).""",
"""a(b, c).""",
"""edge(smfc-ait, smfc).""",
"""edge("smfc-ait", smfc).""",
"""path(A, B) :- edge(A, C), path(C, B).""",
"""path(A, B) :-
edge(A, C),
path(C, B).""",
"""path(A, B) :- % one comment
edge(A, C), % the next
path(C, B).""",
"""foo(A, B) :-
~bar(A, B),
qux(A, B).""",
]
@pytest.mark.parametrize("ex,", EXS)
def test_reader(ex):
assert read(ex)