WIP: support for TOML root config

This commit is contained in:
Reid 'arrdem' McKenzie 2022-09-13 00:57:23 -06:00
parent e6bd83a7a1
commit a19e89e5cc
2 changed files with 301 additions and 46 deletions

View file

@ -5,7 +5,11 @@ import logging
import os import os
from pathlib import Path from pathlib import Path
import pickle import pickle
from typing import List from pprint import pprint
from typing import List, Optional, Union
import re
import platform
from collections import defaultdict
from . import ( from . import (
__author__, __author__,
@ -30,6 +34,78 @@ def _exit(val):
exit(val) exit(val)
def first(iter):
return iter[0]
def second(iter):
return iter[1]
def groupby(iter, keyfn=lambda x: x, mapfn=lambda x: x):
bag = defaultdict(list)
for it in iter:
bag[keyfn(it)].append(mapfn(it))
return dict(bag)
def upsearch(name: Union[str, Path], limit=10) -> Optional[Path]:
d = Path(os.getcwd())
while True:
if limit == 0:
return
p = d.parent
if d == p:
return
dest = d / name
if dest.exists():
return dest
d = p
limit -= 1
def expandvars(s: str) -> str:
def _repl(m: re.Match) -> str:
var = m.group(0).lower()
match var:
case "${hostname}":
return os.uname()[1].split(".")[0]
case "${fqdn}":
return os.uname()[1]
case "${home}":
return os.path.expanduser("~")
case "${uname}" | "${sysname}":
return platform.system().lower()
case "${arch}":
return platform.machine()
case "${release}":
return platform.release()
case _:
return os.path.expandvars(m.group(0))
s = re.sub(r"\${?([^\s}]+)}?", "${\\1}", s)
return re.sub(r"\$\{.*?\}", _repl, s)
def handle_vars(_ctx, _param, s):
return expandvars(s)
def handle_path_vars(_ctx, _param, p):
return Path(expandvars(str(p)))
def expand_requires(requires):
return [expandvars(r) for r in requires]
def handle_requires(_ctx, _params, requires):
return expand_requires(requires)
def load(root: Path, name: str, clss): def load(root: Path, name: str, clss):
for c in clss: for c in clss:
i = c(root, name) i = c(root, name)
@ -47,35 +123,54 @@ def load_profile(root, name):
return load(root, name, [ProfileV1, ProfileV0]) return load(root, name, [ProfileV1, ProfileV0])
def load_packages(root: Path) -> dict: def load_packages(root: Path, roots) -> dict:
"""Load the configured packages.""" """Load the configured packages."""
packages = {} packages = {}
log.debug(f"Trying to load packages from {root}...") for r in roots.get("package", []):
for p in (root / "packages.d").glob("*"): r = Path(r)
name = str(p.relative_to(root)) log.debug(f"Trying to load packages from {r}...")
packages[name] = load_package(p, name) for p in r.glob("*"):
name = str(p.relative_to(root))
packages[name] = load_package(p, name)
# Add profiles, hosts which contain subpackages. for r in roots.get("profile", []):
for mp_root in chain((root / "profiles.d").glob("*"), (root / "hosts.d").glob("*")): # Add profiles, hosts which contain subpackages.
r = Path(r)
log.debug(f"Trying to load profiles from {r}...")
for mp_root in r.glob("*"):
# First find all subpackages
log.debug(f"Trying to load packages from {mp_root}...")
for p in mp_root.glob("*"):
if p.is_dir():
name = str(p.relative_to(root))
packages[name] = load_package(p, name)
# First find all subpackages # Register the metapackages themselves using the profile type
for p in mp_root.glob("*"): mp_name = str(mp_root.relative_to(root))
if p.is_dir(): packages[mp_name] = load_profile(mp_root, mp_name)
name = str(p.relative_to(root))
packages[name] = load_package(p, name)
# Register the metapackages themselves using the profile type
mp_name = str(mp_root.relative_to(root))
packages[mp_name] = load_profile(mp_root, mp_name)
log.debug(f"Loaded {len(packages)} packages...")
return packages return packages
def build_fs(root: Path, dest: Path, prelude: List[str]) -> Vfs: def missing_require_fatal(r):
log.fatal(f"Unable to load package {r}")
_exit(1)
def missing_require_warn(r):
log.warn(f"Unable to load package {r}")
def missing_require_ignore(r):
pass
def build_fs(root: Path, roots, dest: Path, prelude: List[str], missing_require_handler) -> Vfs:
"""Build a VFS by configuring dest from the given config root.""" """Build a VFS by configuring dest from the given config root."""
packages = load_packages(root) packages = load_packages(root, roots)
requirements = [] requirements = []
requirements.extend(prelude) requirements.extend(prelude)
@ -85,14 +180,14 @@ def build_fs(root: Path, dest: Path, prelude: List[str]) -> Vfs:
else: else:
log.warning("Loaded no packages!") log.warning("Loaded no packages!")
for r in requirements: for r in list(requirements):
try: try:
for d in packages[r].requires(): for d in packages[r].requires():
if d not in requirements: if d not in requirements:
requirements.append(d) requirements.append(d)
except KeyError: except KeyError:
log.fatal(f"Error: Unable to load package {r}") missing_require_handler(r)
_exit(1) requirements.remove(r)
# Compute the topsort graph # Compute the topsort graph
requirements = {r: packages[r].requires() for r in requirements} requirements = {r: packages[r].requires() for r in requirements}
@ -173,22 +268,74 @@ def scrub(old_fs: Vfs, new_fs: Vfs) -> Vfs:
return cleanup_fs return cleanup_fs
@click.group() def configure(ctx, param, filename: Optional[Path]):
@click.version_option(version=__version__, message=f"""Cram {__version__} if filename and filename.exists():
ctx.default_map = {}
os.chdir(filename.parent)
log.debug(f"Loading config from {filename}")
cfg = toml.load(filename)
assert cfg["cram"]["version"] >= 1
task_cfg = cfg.get("cram", {}).get("task", {})
defaults = task_cfg.get("default", {})
subcommands = ctx.command.commands.keys()
error = False
for subcommand in task_cfg.keys():
if subcommand not in subcommands and subcommand != "default":
error |= True
log.fatal(f"Erroneous config [cram.task.{subcommand}]")
elif subcommand == "default":
pass
for subcommand in subcommands:
# Take all defaults
m = defaults.copy()
# Splice in task defaults
m.update(task_cfg.get(subcommand, {}))
# And prune out anything that doesn't map to a param
cmd = ctx.command.commands[subcommand]
param_names = {it.name for it in cmd.params}
for k in list(m.keys()):
if k not in param_names:
del m[k]
ctx.default_map[subcommand] = m
if error:
exit(1)
@click.group(invoke_without_command=True)
@click.option(
"--config",
type=click.Path(dir_okay=False),
default=upsearch("cram.toml"),
callback=configure,
is_eager=True,
expose_value=False,
help="Read option defaults from the specified TOML file",
show_default=True,
)
@click.version_option(
version=__version__,
message=f"""Cram {__version__}
Documentation Documentation
https://github.com/arrdem/source/tree/trunk/projects/cram/ https://git.arrdem.com/arrdem/cram
Features Features
- 0.0.0 legacy config format - 0.0.0 legacy config format
- 0.1.0 TOML config format - 0.1.0 TOML config format
- 0.1.0 log based optimizer - 0.1.0 log based optimizer
- 0.1.0 idempotent default for scripts - 0.1.0 idempotent default for scripts
- 0.2.0 initial release
- 0.2.1 TOML root config
About About
{__copyright__}, {__author__}. {__copyright__}, {__author__}.
Published under the terms of the {__license__} license. Published under the terms of the {__license__} license.
""") """,
)
def cli(): def cli():
pass pass
@ -198,11 +345,33 @@ def cli():
@click.option("--force/--no-force", default=False) @click.option("--force/--no-force", default=False)
@click.option("--state-file", default=".cram.log", type=Path) @click.option("--state-file", default=".cram.log", type=Path)
@click.option("--optimize/--no-optimize", default=True) @click.option("--optimize/--no-optimize", default=True)
@click.option("--require", type=str, multiple=True, default=[f"hosts.d/{os.uname()[1].split('.')[0]}", "profiles.d/default"]) @click.option(
"--require",
type=str,
multiple=True,
default=[
"hosts.d/${HOSTNAME}",
"profiles.d/default"
],
callback=handle_requires,
)
@click.option(
"--require-root",
type=(str, click.Choice(["profile", "package"])),
multiple=True,
callback=lambda ctx, param, pairs: groupby([(y, expandvars(x)) for x, y in pairs], first, second)
)
@click.option(
"--missing-require",
type=click.Choice(["error", "warn", "ignore"]),
callback=lambda ctx, param, it: {"ignore": missing_require_ignore, "warn": missing_require_warn, "error": missing_require_fatal}[it]
)
@click.option("--exec-idempotent/--exec-always", "exec_idempotent", default=True) @click.option("--exec-idempotent/--exec-always", "exec_idempotent", default=True)
@click.argument("confdir", type=Path) @click.argument("confdir", type=Path, callback=handle_path_vars)
@click.argument("destdir", type=Path) @click.argument("destdir", type=Path, callback=handle_path_vars)
def do_apply(confdir, destdir, state_file, execute, optimize, force, require, exec_idempotent): def do_apply(
confdir, destdir, state_file, execute, optimize, force, require, require_root, missing_require, exec_idempotent,
):
"""The entry point of cram.""" """The entry point of cram."""
# Resolve the two input paths to absolutes # Resolve the two input paths to absolutes
@ -223,14 +392,13 @@ def do_apply(confdir, destdir, state_file, execute, optimize, force, require, ex
# Force an empty state # Force an empty state
old_fs = Vfs([]) old_fs = Vfs([])
new_fs = build_fs(root, dest, require) new_fs = build_fs(root, require_root, dest, require, missing_require)
log.debug(f"Built new state consisting of {len(new_fs._log)} steps") log.debug(f"Built new state consisting of {len(new_fs._log)} steps")
# Middleware processing of the resulting filesystem(s) # Middleware processing of the resulting filesystem(s)
executable_fs = scrub(old_fs, new_fs) executable_fs = scrub(old_fs, new_fs)
if optimize: if optimize:
executable_fs = simplify(old_fs, executable_fs, executable_fs = simplify(old_fs, executable_fs, exec_idempotent=exec_idempotent)
exec_idempotent=exec_idempotent)
# Dump the new state. # Dump the new state.
# Note that we dump the UNOPTIMIZED state, because we want to simplify relative complete states. # Note that we dump the UNOPTIMIZED state, because we want to simplify relative complete states.
@ -249,10 +417,21 @@ def do_apply(confdir, destdir, state_file, execute, optimize, force, require, ex
@cli.command("list") @cli.command("list")
@click.option("-1", "--oneline", is_flag=True, default=False, help="Only list names of resources") @click.option(
@click.argument("confdir", type=Path) "-1", "--oneline",
@click.argument("requirements", nargs=-1) is_flag=True,
def do_list(confdir, requirements, oneline): default=False,
help="Only list names of resources"
)
@click.option(
"--require-root",
type=(str, click.Choice(["profile", "package"])),
multiple=True,
callback=lambda ctx, param, pairs: groupby([(y, expandvars(x)) for x, y in pairs], first, second)
)
@click.argument("confdir", type=Path, callback=handle_path_vars)
@click.argument("requirements", nargs=-1, callback=handle_requires)
def do_list(confdir, requirements, require_root, oneline):
"""List out packages, profiles, hosts and subpackages in the <confdir>.""" """List out packages, profiles, hosts and subpackages in the <confdir>."""
root = confdir.resolve() root = confdir.resolve()
@ -260,7 +439,7 @@ def do_list(confdir, requirements, oneline):
log.fatal(f"{confdir} does not exist!") log.fatal(f"{confdir} does not exist!")
_exit(1) _exit(1)
packages = load_packages(root) packages = load_packages(root, require_root)
if requirements: if requirements:
dest = Path("~/") dest = Path("~/")
@ -289,7 +468,7 @@ def do_list(confdir, requirements, oneline):
@cli.command("state") @cli.command("state")
@click.option("--state-file", default=".cram.log", type=Path) @click.option("--state-file", default=".cram.log", type=Path)
@click.argument("confdir", type=Path) @click.argument("confdir", type=Path, callback=handle_path_vars)
def do_state(confdir, state_file): def do_state(confdir, state_file):
"""List out the last `apply` state in the <confdir>/.cram.log or --state-file.""" """List out the last `apply` state in the <confdir>/.cram.log or --state-file."""
root = confdir.resolve() root = confdir.resolve()
@ -307,9 +486,15 @@ def do_state(confdir, state_file):
@cli.command("fmt") @cli.command("fmt")
@click.argument("confdir", type=Path) @click.argument("confdir", type=Path, callback=handle_path_vars)
@click.argument("requirement", type=str) @click.option(
def do_fmt(confdir, requirement): "--require-root",
type=(str, click.Choice(["profile", "package"])),
multiple=True,
callback=lambda ctx, param, pairs: groupby([(y, expandvars(x)) for x, y in pairs], first, second)
)
@click.argument("requirement", type=str, callback=handle_path_vars)
def do_fmt(confdir, requirement, require_root):
"""Format the specified requirement to a canonical-ish representation.""" """Format the specified requirement to a canonical-ish representation."""
root = confdir.resolve() root = confdir.resolve()
@ -318,21 +503,65 @@ def do_fmt(confdir, requirement):
log.fatal(f"{confdir} does not exist!") log.fatal(f"{confdir} does not exist!")
_exit(1) _exit(1)
packages = load_packages(root) packages = load_packages(root, require_root)
pkg = packages[requirement] pkg = packages[requirement]
json = pkg.json() json = pkg.json()
for suffix in pkg.SPECIAL_FILES: for suffix in pkg.SPECIAL_FILES:
f = (root / requirement / suffix) f = root / requirement / suffix
if f.exists(): if f.exists():
f.unlink() f.unlink()
with open(root / requirement / "pkg.toml", "w") as fp: with open(root / requirement / "pkg.toml", "w") as fp:
toml.dump(json, fp) toml.dump(json, fp)
@cli.command("init")
@click.argument("confdir", type=Path, callback=handle_path_vars)
@click.option("-f", "--force", is_flag=True)
def do_init(confdir: Path, force: bool):
"""Initialize an empty Cram repo."""
confdir.mkdir(parents=True, exist_ok=True)
rootfile = confdir / 'cram.toml'
if rootfile.exists() and not force:
log.fatal("cram.toml exists! Provide -f/--force to overwrite")
exit(1)
log.info("Writing initial cram.toml...")
with open(rootfile, 'w') as fp:
fp.write("""\
[cram]
version = 1
[cram.task.default]
confdir = "${PWD}"
destdir = "${HOME}"
state_file = "${PWD}/.cram.log"
optimize = true
exec_idempotent = true
require_roots = {
"packages.d" = "package",
"profiles.d" = "profile"
"hosts.d" = "profile",
}
missing_require = "error"
[cram.task.apply]
require = [
"hosts.d/${FQDN}",
"hosts.d/${HOSTNAME}",
"profiles.d/default",
]
""")
for d in ['profiles.d', 'packages.d', 'hosts.d']:
(confdir / d).mkdir(parents=True, exist_ok=True)
if __name__ == "__main__" or 1: if __name__ == "__main__" or 1:
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
) )

View file

@ -0,0 +1,26 @@
[cram]
version = 1
[cram.task.default]
confdir = "${PWD}"
destdir = "${HOME}"
state_file = "${PWD}/.cram.log"
optimize = true
exec_idempotent = true
# Where to load requirements from
# Types: package, profile
require_root = [
["${PWD}/packages.d", "package"],
["${PWD}/profiles.d", "profile"],
["${PWD}/hosts.d", "profile"],
]
# Choice([error, warn, ignore])
missing_require = "error"
[cram.task.apply]
missing_require = "ignore"
require = [
"hosts.d/${FQDN}",
"hosts.d/${HOSTNAME}",
"profiles.d/default",
]