126 lines
3.4 KiB
Python
126 lines
3.4 KiB
Python
"""Conditionally unpack a zapp (and its deps)."""
|
|
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from tempfile import mkdtemp
|
|
from zipfile import ZipFile, is_zipfile
|
|
|
|
from zapp.support.manifest import manifest, once
|
|
|
|
|
|
@once
|
|
def cache_root() -> Path:
|
|
"""Find a root directory for cached behaviors."""
|
|
|
|
shared_cache = Path(os.path.join(os.path.expanduser("~"))) / ".cache" / "zapp"
|
|
|
|
# Happy path, read+write filesystem
|
|
if os.access(shared_cache, os.X_OK | os.W_OK):
|
|
return shared_cache
|
|
|
|
# Unhappy path, we need a tempdir.
|
|
# At least make one that's stable for the program's lifetime.
|
|
else:
|
|
return Path(os.getenv("ZAPP_TMPDIR") or mkdtemp(), "deps")
|
|
|
|
|
|
def cache_wheel_root():
|
|
return cache_root() / "wheels"
|
|
|
|
|
|
def cache_wheel_path(wheel: str) -> Path:
|
|
return cache_wheel_root() / wheel
|
|
|
|
|
|
def cache_zapp_root():
|
|
return cache_root() / "zapps"
|
|
|
|
|
|
def cache_zapp_path(fingerprint):
|
|
return cache_zapp_root() / fingerprint
|
|
|
|
|
|
def unpack_deps():
|
|
"""Unpack deps, populating and updating the host's cache."""
|
|
|
|
if is_zipfile(sys.argv[0]):
|
|
# Create the cache dir as needed
|
|
cache_wheel_root().mkdir(parents=True, exist_ok=True)
|
|
|
|
# For each wheel, touch the existing cached wheel or unpack this one.
|
|
with ZipFile(sys.argv[0], "r") as zf:
|
|
for whl, config in manifest()["wheels"].items():
|
|
cached_whl = cache_wheel_path(whl)
|
|
if cached_whl.exists():
|
|
cached_whl.touch()
|
|
|
|
else:
|
|
with open(cached_whl, "wb") as of:
|
|
of.write(zf.read(".deps/" + whl))
|
|
|
|
else:
|
|
pass
|
|
|
|
|
|
def install_deps():
|
|
"""Validate that the PYTHONPATH has been configured correctly."""
|
|
|
|
# FIXME: Can we reference the requirements, not specific PATH entries?
|
|
for whl, config in manifest()["wheels"].items():
|
|
cached_whl = cache_wheel_path(whl)
|
|
if cached_whl.exists():
|
|
cached_whl.touch()
|
|
|
|
# Remove any references to the dep and shift the cached whl to the front
|
|
p = str(cached_whl.resolve())
|
|
try:
|
|
sys.path.remove(p)
|
|
except ValueError:
|
|
pass
|
|
sys.path.insert(0, p)
|
|
|
|
|
|
def canonicalize_path():
|
|
"""Fixup sys.path entries to use absolute paths. De-dupe in the same pass."""
|
|
|
|
# Note that this HAS to be mutative/in-place
|
|
shift = 0
|
|
for i in range(len(sys.path)):
|
|
idx = i - shift
|
|
el = str(Path.resolve(sys.path[idx]))
|
|
if el in sys.path:
|
|
shift += 1
|
|
sys.path.pop(idx)
|
|
else:
|
|
sys.path[idx] = el
|
|
|
|
|
|
def unpack_zapp():
|
|
"""Unzip a zapp (excluding the .deps/* tree) into a working directory.
|
|
|
|
Note that unlike PEX, these directories are per-run which prevents local mutable state somewhat.
|
|
|
|
"""
|
|
|
|
# Short circuit
|
|
if is_zipfile(sys.argv[0]):
|
|
# Extract
|
|
tmpdir = mkdtemp()
|
|
with ZipFile(sys.argv[0], "r") as zf:
|
|
for src in manifest()["sources"].keys():
|
|
ofp = Path(tmpdir, "usr", src)
|
|
ofp.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(ofp, "wb") as of:
|
|
of.write(zf.read(src))
|
|
|
|
# Re-exec the current interpreter
|
|
args = [
|
|
sys.executable,
|
|
"--",
|
|
os.path.join(tmpdir, "usr", "__main__.py"),
|
|
] + sys.argv[1:]
|
|
os.execvpe(args[0], args[1:], {"PYTHONPATH": "", "ZAPP_TMPDIR": tmpdir})
|
|
|
|
else:
|
|
pass
|