Considerable headway here

This commit is contained in:
Reid 'arrdem' McKenzie 2021-05-14 22:41:00 -06:00
parent 2605baa215
commit 0246be6a14
3 changed files with 300 additions and 87 deletions

View file

@ -1,12 +1,21 @@
py_library( py_library(
name = "yamlschema", name="yamlschema",
srcs = [ srcs=[
"yamlschema.py", "yamlschema.py",
], ],
imports = [ imports=[
".", ".",
], ],
deps = [ deps=[
py_requirement("PyYAML"), py_requirement("PyYAML"),
] ],
)
py_pytest(
name="test_yamlschema",
srcs=glob(["test_*.py"]),
data=glob(["*.json", "*.yaml"]),
deps=[
":yamlschema",
],
) )

View file

@ -0,0 +1,19 @@
"""
Tests covering the YAML linter.
"""
from yamlschema import lint_buffer
import pytest
@pytest.mark.parametrize('schema, obj', [
({"type": "number"}, "---\n1.0"),
({"type": "integer"}, "---\n3"),
({"type": "string"}, "---\nfoo bar baz"),
({"type": "string", "maxLength": 15}, "---\nfoo bar baz"),
({"type": "string", "minLength": 10}, "---\nfoo bar baz"),
({"type": "string", "pattern": "^foo.*"}, "---\nfoo bar baz"),
])
def test_lint_document_ok(schema, obj):
assert not list(lint_buffer(schema, obj))

View file

@ -4,94 +4,252 @@ JSONSchema linting for YAML documents.
import logging import logging
import typing as t import typing as t
from enum import Enum
from io import StringIO
import re
import yaml
from yaml.nodes import MappingNode, Node, ScalarNode, SequenceNode from yaml.nodes import MappingNode, Node, ScalarNode, SequenceNode
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def lint_mapping(schema, node: Node) -> t.List[str]: class LintLevel(Enum):
lint: t.List[str] = [] """Lint outcome levels."""
MISSING = 1
MISSMATCH = 2
UNEXPECTED = 3
class LintRecord(t.NamedTuple):
"""A linting record."""
level: LintLevel
node: Node
schema: object
message: str
@property
def start_mark(self):
return self.node.start_mark
class YamlLinter(object):
"""YAML linting against JSON schemas."""
def __init__(self, schema):
self._schema = schema
def dereference(self, schema):
"""Dereference a {"$ref": ""} form."""
if ref := schema.get("$ref"):
assert ref.startswith("#/")
path = ref.lstrip("#/").split("/")
schema = self._schema
for e in path:
if not (schema := path.get(e)):
raise ValueError(f"Unable to dereference {ref}")
return schema
def lint_mapping(self, schema, node: Node) -> t.Iterable[str]:
"""FIXME.
"""
if schema["type"] != "object" or not isinstance(node, MappingNode): if schema["type"] != "object" or not isinstance(node, MappingNode):
raise TypeError( yield LintRecord(
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" LintLevel.MISSMATCH,
node,
schema,
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}",
) )
additional_allowed: bool = schema.get("additionalProperties", False) != False
additional_type: t.Union[dict, bool] = ( additional_type: t.Union[dict, bool] = (
schema.get("additionalProperties") if additional_allowed schema.get("additionalProperties", True)
else {}
) )
properties: dict = schema.get("properties", {}) properties: dict = schema.get("properties", {})
required: t.List[str] = schema.get("required", []) required: t.Iterable[str] = schema.get("required", [])
for k in required: for k in required:
if k not in [_k.value for _k, _v in node.value]: if k not in [_k.value for _k, _v in node.value]:
raise TypeError( yield LintRecord(
f"Required key {k!r} absent from mapping {str(node.start_mark).lstrip()}" LintLevel.MISSING,
node,
schema,
f"Required key {k!r} absent from mapping {str(node.start_mark).lstrip()}",
) )
for k, v in node.value: for k, v in node.value:
if k.value in properties: if k.value in properties:
lint.extend(lint_document(properties.get(k.value), v)) yield from self.lint_document(properties.get(k.value), v)
elif additional_type:
yield from self.lint_document(additional_type, v)
elif additional_allowed:
# 'true' is a way to encode the any type.
if additional_type == True:
pass
else: else:
lint.extend(lint_document(additional_type, v)) yield LintRecord(
else: LintLevel.UNEXPECTED,
lint.append( node,
f"Key {k.value!r} is not allowed by schema {str(node.start_mark).lstrip()}" schema,
f"Key {k.value!r} is not allowed by schema {str(node.start_mark).lstrip()}",
) )
return lint def lint_sequence(self, schema, node: Node) -> t.Iterable[str]:
"""FIXME.
def lint_sequence(schema, node: Node) -> t.List[str]:
""""FIXME.
There aren't sequences we need to lint in the current schema design, punting. There aren't sequences we need to lint in the current schema design, punting.
""" """
if schema["type"] != "array" or not isinstance(node, SequenceNode): if schema["type"] != "array" or not isinstance(node, SequenceNode):
raise TypeError( yield LintRecord(
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" LintLevel.MISSMATCH,
node,
schema,
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}",
) )
lint = []
subschema = schema.get("items") subschema = schema.get("items")
if subschema: if subschema:
for item in node.value: for item in node.value:
lint.extend(lint_document(subschema, item)) yield from self.lint_document(subschema, item)
return lint
def lint_scalar(self, schema, node: Node) -> t.Iterable[str]:
def lint_scalar(schema, node: Node) -> t.List[str]:
"""FIXME. """FIXME.
The only terminal we care about linting in the current schema is {"type": "string"}. The only terminal we care about linting in the current schema is {"type": "string"}.
""" """
if schema["type"] not in ["string", "number"] or not isinstance(node, ScalarNode):
raise TypeError( if schema["type"] == "string":
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" yield from self.lint_string(schema, node)
elif schema["type"] == "integer":
yield from self.lint_integer(schema, node)
elif schema["type"] == "number":
yield from self.lint_number(schema, node)
else:
raise NotImplementedError(
f"Scalar type {schema['type']} is not supported"
) )
lint = [] def lint_string(self, schema, node: Node) -> t.Iterable[str]:
if schema["type"] == "string": """FIXME."""
if not isinstance(node.value, str):
lint.append(f"Expected string, got {node.id} {str(node.start_mark).lstrip()}") if node.tag != "tag:yaml.org,2002:str":
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a string, got a {node}"
)
if maxl := schema.get("maxLength"):
if len(node.value) > maxl:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a shorter string"
)
if minl := schema.get("minLength"):
if len(node.value) < minl:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a longer string"
)
if pat := schema.get("pattern"):
if not re.fullmatch(pat, node.value):
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a string matching the pattern"
)
def lint_integer(self, schema, node: Node) -> t.Iterable[str]:
if node.tag == "tag:yaml.org,2002:int":
value = int(node.value)
yield from self._lint_num_range(schema, node, value)
else: else:
log.info(f"Ignoring unlintable scalar, schema {schema!r} {str(node.start_mark).lstrip()}") yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected an integer, got a {node}"
)
return lint def lint_number(self, schema, node: Node) -> t.Iterable[str]:
if node.tag == "tag:yaml.org,2002:float":
value = float(node.value)
yield from self._lint_num_range(schema, node, value)
else:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected an integer, got a {node}"
)
def lint_document(schema, node): def _lint_num_range(self, schema, node: Node, value) -> t.Iterable[str]:
""""FIXME."""
if base := schema.get("multipleOf"):
if value % base != 0:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a multiple of {base}, got {value}"
)
if max := schema.get("exclusiveMaximum"):
if value >= max:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a value less than {max}, got {value}"
)
if max := schema.get("maximum"):
if value > max:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a value less than or equal to {max}, got {value}"
)
if min := schema.get("exclusiveMinimum"):
if value <= min:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a value greater than {min}, got {value}"
)
if min := schema.get("minimum"):
if value < min:
yield LintRecord(
LintLevel.MISSMATCH,
node,
schema,
f"Expected a value greater than or equal to {min}, got {value}"
)
def lint_document(self, node, schema=None) -> t.Iterable[str]:
"""Lint a document. """Lint a document.
Given a Node within a document (or the root of a document!), return a Given a Node within a document (or the root of a document!), return a
@ -99,13 +257,40 @@ def lint_document(schema, node):
""" """
schema = schema or self._schema # Fixing up the schema source
schema = self.dereference(schema) # And dereference it if needed
if schema == True or schema == {}: if schema == True or schema == {}:
return [] yield from []
elif isinstance(node, MappingNode): elif isinstance(node, MappingNode):
return lint_mapping(schema, node) yield from self.lint_mapping(schema, node)
elif isinstance(node, SequenceNode): elif isinstance(node, SequenceNode):
return lint_sequence(schema, node) yield from self.lint_sequence(schema, node)
elif isinstance(node, ScalarNode): elif isinstance(node, ScalarNode):
return lint_scalar(schema, node) yield from self.lint_scalar(schema, node)
else: else:
return [] yield from []
def lint_node(schema, node, cls=YamlLinter):
"""Lint a document using a schema and linter."""
print(repr(node))
linter = cls(schema)
yield from linter.lint_document(node)
def lint_buffer(schema, buff: str, cls=YamlLinter):
"""Lint a buffer (string)."""
with StringIO(buff) as f:
node = yaml.compose(f)
yield from lint_node(schema, node, cls=cls)
def lint_file(schema, path, cls=YamlLinter):
"""Lint a file."""
with open(path) as f:
node = yaml.compose(f)
yield from lint_node(schema, node, cls=cls)