From 0246be6a14d329fab18b770ce6b84796ee8114e7 Mon Sep 17 00:00:00 2001 From: Reid 'arrdem' McKenzie Date: Fri, 14 May 2021 22:41:00 -0600 Subject: [PATCH] Considerable headway here --- projects/yamlschema/BUILD | 19 +- projects/yamlschema/test_yamlschema.py | 19 ++ projects/yamlschema/yamlschema.py | 349 +++++++++++++++++++------ 3 files changed, 300 insertions(+), 87 deletions(-) create mode 100644 projects/yamlschema/test_yamlschema.py diff --git a/projects/yamlschema/BUILD b/projects/yamlschema/BUILD index 03c512d..5ad0b9a 100644 --- a/projects/yamlschema/BUILD +++ b/projects/yamlschema/BUILD @@ -1,12 +1,21 @@ py_library( - name = "yamlschema", - srcs = [ + name="yamlschema", + srcs=[ "yamlschema.py", ], - imports = [ + imports=[ ".", ], - deps = [ + deps=[ py_requirement("PyYAML"), - ] + ], +) + +py_pytest( + name="test_yamlschema", + srcs=glob(["test_*.py"]), + data=glob(["*.json", "*.yaml"]), + deps=[ + ":yamlschema", + ], ) diff --git a/projects/yamlschema/test_yamlschema.py b/projects/yamlschema/test_yamlschema.py new file mode 100644 index 0000000..c39b545 --- /dev/null +++ b/projects/yamlschema/test_yamlschema.py @@ -0,0 +1,19 @@ +""" +Tests covering the YAML linter. +""" + +from yamlschema import lint_buffer + +import pytest + + +@pytest.mark.parametrize('schema, obj', [ + ({"type": "number"}, "---\n1.0"), + ({"type": "integer"}, "---\n3"), + ({"type": "string"}, "---\nfoo bar baz"), + ({"type": "string", "maxLength": 15}, "---\nfoo bar baz"), + ({"type": "string", "minLength": 10}, "---\nfoo bar baz"), + ({"type": "string", "pattern": "^foo.*"}, "---\nfoo bar baz"), +]) +def test_lint_document_ok(schema, obj): + assert not list(lint_buffer(schema, obj)) diff --git a/projects/yamlschema/yamlschema.py b/projects/yamlschema/yamlschema.py index ef83795..41b1f1a 100644 --- a/projects/yamlschema/yamlschema.py +++ b/projects/yamlschema/yamlschema.py @@ -4,108 +4,293 @@ JSONSchema linting for YAML documents. import logging import typing as t +from enum import Enum +from io import StringIO +import re +import yaml from yaml.nodes import MappingNode, Node, ScalarNode, SequenceNode log = logging.getLogger(__name__) -def lint_mapping(schema, node: Node) -> t.List[str]: - lint: t.List[str] = [] - if schema["type"] != "object" or not isinstance(node, MappingNode): - raise TypeError( - f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" - ) +class LintLevel(Enum): + """Lint outcome levels.""" - additional_allowed: bool = schema.get("additionalProperties", False) != False - additional_type: t.Union[dict, bool] = ( - schema.get("additionalProperties") if additional_allowed - else {} - ) - properties: dict = schema.get("properties", {}) - required: t.List[str] = schema.get("required", []) - - for k in required: - if k not in [_k.value for _k, _v in node.value]: - raise TypeError( - f"Required key {k!r} absent from mapping {str(node.start_mark).lstrip()}" - ) - - for k, v in node.value: - if k.value in properties: - lint.extend(lint_document(properties.get(k.value), v)) - - elif additional_allowed: - # 'true' is a way to encode the any type. - if additional_type == True: - pass - else: - lint.extend(lint_document(additional_type, v)) - else: - lint.append( - f"Key {k.value!r} is not allowed by schema {str(node.start_mark).lstrip()}" - ) - - return lint + MISSING = 1 + MISSMATCH = 2 + UNEXPECTED = 3 -def lint_sequence(schema, node: Node) -> t.List[str]: - """"FIXME. +class LintRecord(t.NamedTuple): + """A linting record.""" - There aren't sequences we need to lint in the current schema design, punting. + level: LintLevel + node: Node + schema: object + message: str - """ - - if schema["type"] != "array" or not isinstance(node, SequenceNode): - raise TypeError( - f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" - ) - - lint = [] - subschema = schema.get("items") - if subschema: - for item in node.value: - lint.extend(lint_document(subschema, item)) - return lint + @property + def start_mark(self): + return self.node.start_mark -def lint_scalar(schema, node: Node) -> t.List[str]: - """FIXME. +class YamlLinter(object): + """YAML linting against JSON schemas.""" - The only terminal we care about linting in the current schema is {"type": "string"}. + def __init__(self, schema): + self._schema = schema - """ - if schema["type"] not in ["string", "number"] or not isinstance(node, ScalarNode): - raise TypeError( - f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" - ) + def dereference(self, schema): + """Dereference a {"$ref": ""} form.""" - lint = [] - if schema["type"] == "string": - if not isinstance(node.value, str): - lint.append(f"Expected string, got {node.id} {str(node.start_mark).lstrip()}") - else: - log.info(f"Ignoring unlintable scalar, schema {schema!r} {str(node.start_mark).lstrip()}") + if ref := schema.get("$ref"): + assert ref.startswith("#/") + path = ref.lstrip("#/").split("/") + schema = self._schema + for e in path: + if not (schema := path.get(e)): + raise ValueError(f"Unable to dereference {ref}") - return lint + return schema + + def lint_mapping(self, schema, node: Node) -> t.Iterable[str]: + """FIXME. + + """ + + if schema["type"] != "object" or not isinstance(node, MappingNode): + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}", + ) + + additional_type: t.Union[dict, bool] = ( + schema.get("additionalProperties", True) + ) + properties: dict = schema.get("properties", {}) + required: t.Iterable[str] = schema.get("required", []) + + for k in required: + if k not in [_k.value for _k, _v in node.value]: + yield LintRecord( + LintLevel.MISSING, + node, + schema, + f"Required key {k!r} absent from mapping {str(node.start_mark).lstrip()}", + ) + + for k, v in node.value: + if k.value in properties: + yield from self.lint_document(properties.get(k.value), v) + + elif additional_type: + yield from self.lint_document(additional_type, v) + + else: + yield LintRecord( + LintLevel.UNEXPECTED, + node, + schema, + f"Key {k.value!r} is not allowed by schema {str(node.start_mark).lstrip()}", + ) + + def lint_sequence(self, schema, node: Node) -> t.Iterable[str]: + """FIXME. + + There aren't sequences we need to lint in the current schema design, punting. + + """ + + if schema["type"] != "array" or not isinstance(node, SequenceNode): + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}", + ) + + subschema = schema.get("items") + if subschema: + for item in node.value: + yield from self.lint_document(subschema, item) + + def lint_scalar(self, schema, node: Node) -> t.Iterable[str]: + """FIXME. + + The only terminal we care about linting in the current schema is {"type": "string"}. + + """ + + if schema["type"] == "string": + yield from self.lint_string(schema, node) + elif schema["type"] == "integer": + yield from self.lint_integer(schema, node) + elif schema["type"] == "number": + yield from self.lint_number(schema, node) + else: + raise NotImplementedError( + f"Scalar type {schema['type']} is not supported" + ) + + def lint_string(self, schema, node: Node) -> t.Iterable[str]: + """FIXME.""" + + if node.tag != "tag:yaml.org,2002:str": + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a string, got a {node}" + ) + + if maxl := schema.get("maxLength"): + if len(node.value) > maxl: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a shorter string" + ) + + if minl := schema.get("minLength"): + if len(node.value) < minl: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a longer string" + ) + + if pat := schema.get("pattern"): + if not re.fullmatch(pat, node.value): + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a string matching the pattern" + ) + + def lint_integer(self, schema, node: Node) -> t.Iterable[str]: + if node.tag == "tag:yaml.org,2002:int": + value = int(node.value) + yield from self._lint_num_range(schema, node, value) + + else: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected an integer, got a {node}" + ) + + def lint_number(self, schema, node: Node) -> t.Iterable[str]: + if node.tag == "tag:yaml.org,2002:float": + value = float(node.value) + yield from self._lint_num_range(schema, node, value) + + else: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected an integer, got a {node}" + ) -def lint_document(schema, node): - """Lint a document. + def _lint_num_range(self, schema, node: Node, value) -> t.Iterable[str]: + """"FIXME.""" - Given a Node within a document (or the root of a document!), return a - (possibly empty!) list of lint or raise in case of fatal errors. + if base := schema.get("multipleOf"): + if value % base != 0: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a multiple of {base}, got {value}" + ) - """ + if max := schema.get("exclusiveMaximum"): + if value >= max: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a value less than {max}, got {value}" + ) - if schema == True or schema == {}: - return [] - elif isinstance(node, MappingNode): - return lint_mapping(schema, node) - elif isinstance(node, SequenceNode): - return lint_sequence(schema, node) - elif isinstance(node, ScalarNode): - return lint_scalar(schema, node) - else: - return [] + if max := schema.get("maximum"): + if value > max: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a value less than or equal to {max}, got {value}" + ) + + if min := schema.get("exclusiveMinimum"): + if value <= min: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a value greater than {min}, got {value}" + ) + + if min := schema.get("minimum"): + if value < min: + yield LintRecord( + LintLevel.MISSMATCH, + node, + schema, + f"Expected a value greater than or equal to {min}, got {value}" + ) + + def lint_document(self, node, schema=None) -> t.Iterable[str]: + """Lint a document. + + Given a Node within a document (or the root of a document!), return a + (possibly empty!) list of lint or raise in case of fatal errors. + + """ + + schema = schema or self._schema # Fixing up the schema source + schema = self.dereference(schema) # And dereference it if needed + + if schema == True or schema == {}: + yield from [] + elif isinstance(node, MappingNode): + yield from self.lint_mapping(schema, node) + elif isinstance(node, SequenceNode): + yield from self.lint_sequence(schema, node) + elif isinstance(node, ScalarNode): + yield from self.lint_scalar(schema, node) + else: + yield from [] + + +def lint_node(schema, node, cls=YamlLinter): + """Lint a document using a schema and linter.""" + + print(repr(node)) + linter = cls(schema) + yield from linter.lint_document(node) + + +def lint_buffer(schema, buff: str, cls=YamlLinter): + """Lint a buffer (string).""" + + with StringIO(buff) as f: + node = yaml.compose(f) + yield from lint_node(schema, node, cls=cls) + + +def lint_file(schema, path, cls=YamlLinter): + """Lint a file.""" + + with open(path) as f: + node = yaml.compose(f) + yield from lint_node(schema, node, cls=cls)