From 6f8a1bf831e32f10f9304dacc7ecba2958d955d7 Mon Sep 17 00:00:00 2001 From: Reid McKenzie Date: Fri, 14 May 2021 11:55:28 -0600 Subject: [PATCH] yamlschema --- projects/yamlschema/BUILD | 12 ++++ projects/yamlschema/README.md | 3 + projects/yamlschema/yamlschema.py | 111 ++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 projects/yamlschema/BUILD create mode 100644 projects/yamlschema/README.md create mode 100644 projects/yamlschema/yamlschema.py diff --git a/projects/yamlschema/BUILD b/projects/yamlschema/BUILD new file mode 100644 index 0000000..03c512d --- /dev/null +++ b/projects/yamlschema/BUILD @@ -0,0 +1,12 @@ +py_library( + name = "yamlschema", + srcs = [ + "yamlschema.py", + ], + imports = [ + ".", + ], + deps = [ + py_requirement("PyYAML"), + ] +) diff --git a/projects/yamlschema/README.md b/projects/yamlschema/README.md new file mode 100644 index 0000000..22cf4b2 --- /dev/null +++ b/projects/yamlschema/README.md @@ -0,0 +1,3 @@ +# YAML Schema + +A pocket library that implements some amount of jsonschema validation against YAML documents. diff --git a/projects/yamlschema/yamlschema.py b/projects/yamlschema/yamlschema.py new file mode 100644 index 0000000..ef83795 --- /dev/null +++ b/projects/yamlschema/yamlschema.py @@ -0,0 +1,111 @@ +""" +JSONSchema linting for YAML documents. +""" + +import logging +import typing as t + +from yaml.nodes import MappingNode, Node, ScalarNode, SequenceNode + + +log = logging.getLogger(__name__) + + +def lint_mapping(schema, node: Node) -> t.List[str]: + lint: t.List[str] = [] + if schema["type"] != "object" or not isinstance(node, MappingNode): + raise TypeError( + f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" + ) + + additional_allowed: bool = schema.get("additionalProperties", False) != False + additional_type: t.Union[dict, bool] = ( + schema.get("additionalProperties") if additional_allowed + else {} + ) + properties: dict = schema.get("properties", {}) + required: t.List[str] = schema.get("required", []) + + for k in required: + if k not in [_k.value for _k, _v in node.value]: + raise TypeError( + f"Required key {k!r} absent from mapping {str(node.start_mark).lstrip()}" + ) + + for k, v in node.value: + if k.value in properties: + lint.extend(lint_document(properties.get(k.value), v)) + + elif additional_allowed: + # 'true' is a way to encode the any type. + if additional_type == True: + pass + else: + lint.extend(lint_document(additional_type, v)) + else: + lint.append( + f"Key {k.value!r} is not allowed by schema {str(node.start_mark).lstrip()}" + ) + + return lint + + +def lint_sequence(schema, node: Node) -> t.List[str]: + """"FIXME. + + There aren't sequences we need to lint in the current schema design, punting. + + """ + + if schema["type"] != "array" or not isinstance(node, SequenceNode): + raise TypeError( + f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" + ) + + lint = [] + subschema = schema.get("items") + if subschema: + for item in node.value: + lint.extend(lint_document(subschema, item)) + return lint + + +def lint_scalar(schema, node: Node) -> t.List[str]: + """FIXME. + + The only terminal we care about linting in the current schema is {"type": "string"}. + + """ + if schema["type"] not in ["string", "number"] or not isinstance(node, ScalarNode): + raise TypeError( + f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}" + ) + + lint = [] + if schema["type"] == "string": + if not isinstance(node.value, str): + lint.append(f"Expected string, got {node.id} {str(node.start_mark).lstrip()}") + else: + log.info(f"Ignoring unlintable scalar, schema {schema!r} {str(node.start_mark).lstrip()}") + + return lint + + +def lint_document(schema, node): + """Lint a document. + + Given a Node within a document (or the root of a document!), return a + (possibly empty!) list of lint or raise in case of fatal errors. + + """ + + if schema == True or schema == {}: + return [] + elif isinstance(node, MappingNode): + return lint_mapping(schema, node) + elif isinstance(node, SequenceNode): + return lint_sequence(schema, node) + elif isinstance(node, ScalarNode): + return lint_scalar(schema, node) + else: + return []