yamlschema

This commit is contained in:
Reid McKenzie 2021-05-14 11:55:28 -06:00
parent 4e8ac14536
commit 6f8a1bf831
3 changed files with 126 additions and 0 deletions

12
projects/yamlschema/BUILD Normal file
View file

@ -0,0 +1,12 @@
py_library(
name = "yamlschema",
srcs = [
"yamlschema.py",
],
imports = [
".",
],
deps = [
py_requirement("PyYAML"),
]
)

View file

@ -0,0 +1,3 @@
# YAML Schema
A pocket library that implements some amount of jsonschema validation against YAML documents.

View file

@ -0,0 +1,111 @@
"""
JSONSchema linting for YAML documents.
"""
import logging
import typing as t
from yaml.nodes import MappingNode, Node, ScalarNode, SequenceNode
log = logging.getLogger(__name__)
def lint_mapping(schema, node: Node) -> t.List[str]:
lint: t.List[str] = []
if schema["type"] != "object" or not isinstance(node, MappingNode):
raise TypeError(
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}"
)
additional_allowed: bool = schema.get("additionalProperties", False) != False
additional_type: t.Union[dict, bool] = (
schema.get("additionalProperties") if additional_allowed
else {}
)
properties: dict = schema.get("properties", {})
required: t.List[str] = schema.get("required", [])
for k in required:
if k not in [_k.value for _k, _v in node.value]:
raise TypeError(
f"Required key {k!r} absent from mapping {str(node.start_mark).lstrip()}"
)
for k, v in node.value:
if k.value in properties:
lint.extend(lint_document(properties.get(k.value), v))
elif additional_allowed:
# 'true' is a way to encode the any type.
if additional_type == True:
pass
else:
lint.extend(lint_document(additional_type, v))
else:
lint.append(
f"Key {k.value!r} is not allowed by schema {str(node.start_mark).lstrip()}"
)
return lint
def lint_sequence(schema, node: Node) -> t.List[str]:
""""FIXME.
There aren't sequences we need to lint in the current schema design, punting.
"""
if schema["type"] != "array" or not isinstance(node, SequenceNode):
raise TypeError(
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}"
)
lint = []
subschema = schema.get("items")
if subschema:
for item in node.value:
lint.extend(lint_document(subschema, item))
return lint
def lint_scalar(schema, node: Node) -> t.List[str]:
"""FIXME.
The only terminal we care about linting in the current schema is {"type": "string"}.
"""
if schema["type"] not in ["string", "number"] or not isinstance(node, ScalarNode):
raise TypeError(
f"Expected {schema['type']}, got {node.id} {str(node.start_mark).lstrip()}"
)
lint = []
if schema["type"] == "string":
if not isinstance(node.value, str):
lint.append(f"Expected string, got {node.id} {str(node.start_mark).lstrip()}")
else:
log.info(f"Ignoring unlintable scalar, schema {schema!r} {str(node.start_mark).lstrip()}")
return lint
def lint_document(schema, node):
"""Lint a document.
Given a Node within a document (or the root of a document!), return a
(possibly empty!) list of lint or raise in case of fatal errors.
"""
if schema == True or schema == {}:
return []
elif isinstance(node, MappingNode):
return lint_mapping(schema, node)
elif isinstance(node, SequenceNode):
return lint_sequence(schema, node)
elif isinstance(node, ScalarNode):
return lint_scalar(schema, node)
else:
return []