Source code for csspin

# -*- mode: python; coding: utf-8 -*-
#
# Copyright 2020 CONTACT Software GmbH
# https://www.contact-software.com/
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines

"""This is the plugin API of spin. It contains functions and classes
that are necessary for plugins to register themselves with spin,
e.g. :py:func:`task`, and convenience APIs that aim to simplify plugin
implementation.

spin's task management (aka subcommands) is just a thin wrapper on top
of the venerable `package click
<https://click.palletsprojects.com/en/8.0.x/>`_, so to create any
slightly advanced command line interfaces for plugins you want to make
yourself comfortable with click's documentation.

"""

from __future__ import annotations

from enum import IntEnum
from typing import TYPE_CHECKING, Iterable, Type

import packaging.version
import platformdirs.unix

if TYPE_CHECKING:
    from typing import Any, Callable, Generator
    from csspin.tree import ConfigTree
    from csspin.cli import GroupWithAliases

import collections
import importlib.metadata
import inspect
import os
import pickle
import re
import shlex
import shutil
import subprocess
import sys
import tarfile
import urllib.request
import zipfile
from contextlib import contextmanager
from traceback import format_exc

import click
import packaging
import platformdirs
from path import Path

__all__ = [
    "debug",
    "echo",
    "info",
    "warn",
    "error",
    "cd",
    "copy",
    "confirm",
    "exists",
    "mkdir",
    "rmtree",
    "die",
    "Command",
    "sh",
    "backtick",
    "setenv",
    "readbytes",
    "writebytes",
    "readtext",
    "writetext",
    "appendtext",
    "persist",
    "unpersist",
    "memoizer",
    "namespaces",
    "interpolate1",
    "interpolate",
    "config",
    "readyaml",
    "download",
    "argument",
    "option",
    "task",
    "group",
    "invoke",
    "toporun",
    "Path",
    "Memoizer",
    "EXPORTS",
]

secrets: set[str] = set()


def obfuscate(msg: Iterable[str] | str) -> list[str] | str:
    mask = "*******"

    if isinstance(msg, str):
        new_msg = msg
        for secret in secrets:
            new_msg = new_msg.replace(secret, mask)
        return new_msg

    elif isinstance(msg, Iterable):
        msg_list: list[str] = []
        for string in msg:
            for secret in secrets:
                string = string.replace(secret, mask)
            msg_list.append(string)
        return msg_list


[docs] def echo(*msg: str, resolve: bool = False, **kwargs: Any) -> None: """Print a message to the console by joining the positional arguments `msg` with spaces. `echo` is meant for messages that explain to the user what spin is doing (e.g. *echoing* commands launched). It will remain silent though when ``spin`` is run with the ``--quiet`` flag. If the parameter ``resolve`` is set to ``True``, the arguments are interpolated against the configuration tree. `echo` supports the same keyword arguments as Click's :py:func:`click.echo`. """ if CONFIG.verbosity > Verbosity.QUIET: if resolve: msg = interpolate(msg) # type: ignore[assignment] msg = obfuscate(msg) # type: ignore[assignment] click.echo(click.style("spin: ", fg="green"), nl=False) click.echo(click.style(" ".join(msg), bold=True), **kwargs)
[docs] def info(*msg: str, **kwargs: Any) -> None: """Print a message to the console by joining the positional arguments `msg` with spaces. Arguments are interpolated against the configuration tree. `info` will remain silent unless ``spin`` is run with the ``--verbose`` flag. `info` is meant for messages that provide additional details. `info` supports the same keyword arguments as Click's :py:func:`click.echo`. """ if CONFIG.verbosity > Verbosity.NORMAL: msg = interpolate(msg) # type: ignore[assignment] msg = obfuscate(msg) # type: ignore[assignment] click.echo(click.style("spin: ", fg="green"), nl=False) click.echo(" ".join(msg), **kwargs)
[docs] def debug(*msg: str, resolve: bool = False, **kwargs: Any) -> None: """Print a message to the console by joining the positional arguments `msg` with spaces. Arguments are interpolated against the configuration tree if ``resolve`` evaluates to ``True``. `debug` will remain silent unless ``spin`` is run with the ``-vv`` flag. `debug` is meant for messages that provide internal details. `debug` supports the same keyword arguments as Click's :py:func:`click.echo`. """ if CONFIG.verbosity > Verbosity.INFO: if resolve: msg = interpolate(msg) # type: ignore[assignment] msg = obfuscate(msg) # type: ignore[assignment] click.echo(click.style("spin: debug: ", fg="white", dim=True), nl=False) click.echo(" ".join(msg), **kwargs)
[docs] def warn(*msg: str, **kwargs: Any) -> None: """Print a warning message to the console by joining the positional arguments `msg` with spaces. Arguments are interpolated against the configuration tree. The output is written to standard error. `warn` supports the same keyword arguments as Click's :py:func:`click.echo`. """ msg = interpolate(msg) # type: ignore[assignment] msg = obfuscate(msg) # type: ignore[assignment] click.echo(click.style("spin: warning: ", fg="yellow"), nl=False, err=True) click.echo(" ".join(msg), err=True, **kwargs)
[docs] def error(*msg: str, resolve: bool = True, **kwargs: Any) -> None: """Print an error message to the console by joining the positional arguments `msg` with spaces. Arguments are interpolated against the configuration tree if `resolve` evaluates to `True`. The output is written to standard error. `error` supports the same keyword arguments as Click's :py:func:`click.echo`. """ if resolve: msg = interpolate(msg) # type: ignore[assignment] msg = obfuscate(msg) # type: ignore[assignment] click.echo(click.style("spin: error: ", fg="red"), nl=False, err=True) click.echo(" ".join(msg), err=True, **kwargs)
def confirm(*msg: str, resolve: bool = True, **kwargs: Any) -> bool: """Prompt user for confirmation. Arguments are interpolated against the configuration tree if `resolve` evaluates to `True`. The output is written to standard out. `confirm` supports the same keyword arguments as Click's :py:func:`click.confirm`. """ if resolve: msg = interpolate(msg) # type: ignore[assignment] msg = obfuscate(msg) # type: ignore[assignment] click.echo(click.style("spin: ", fg="yellow"), nl=False) return click.confirm(" ".join(msg), **kwargs)
[docs] class Verbosity(IntEnum): """ :py:class:`enum.IntEnum` defining four verbosity levels: * ``QUIET``: Outputs only warnings and errors via :py:func:`csspin.warn()` and :py:func:`csspin.error()`. * ``NORMAL``: Outputs the normal amount of verbosity, extending the quiet level by enabling :py:func:`csspin.echo()`. * ``INFO``: Extends normal verbosity to enable :py:func:`csspin.info()`. * ``DEBUG``: Extends info verbosity to enable debug messages via :py:func:`csspin.debug()`. """ QUIET = -1 NORMAL = 0 INFO = 1 DEBUG = 2 @classmethod def _missing_(cls, _) -> Verbosity: # type: ignore[no-untyped-def] warn( "Invalid verbosity level, only '-v' and '-vv' are allowed! Verbosity is" " set to 'DEBUG'." ) return Verbosity(2)
class DirectoryChanger: """A simple class to change the current directory. Change directory on construction, and restore the cwd when used as a context manager. Noop if we're already in the wanted directory. """ def __init__(self: DirectoryChanger, path: str | Path) -> None: """Change directory.""" path = interpolate1(path) self._cwd = os.getcwd() if not os.path.samefile(path, self._cwd): echo("cd", path) os.chdir(path) def __enter__(self: DirectoryChanger) -> None: """Nop.""" def __exit__(self: DirectoryChanger, *args: Any) -> None: """Change back to where we came from.""" if not os.path.samefile(self._cwd, os.getcwd()): echo("cd", self._cwd) os.chdir(self._cwd)
[docs] def cd(path: str | Path) -> DirectoryChanger: """Change directory. The `path` argument is interpolated against the configuration tree. `cd` can be used either as a function or as a context manager. When used as a context manager, the working directory is changed back to what it was before the ``with`` block. You can do this: >>> cd("{spin.project_root}") ... or that: >>> with cd("{spin.project_root}"): <do something in this directory> """ return DirectoryChanger(path)
[docs] def exists(path: str | Path) -> bool: """Check whether `path` exists. The argument is interpolated against the configuration tree. """ path = interpolate1(path) return os.path.exists(path)
[docs] def normpath(*args: str | Path) -> str: """Interpolate and return a normalized path as str""" return os.path.normpath(os.path.join(*interpolate(args))) # type: ignore[no-any-return]
[docs] def abspath(*args: str | Path) -> str: """Interpolate and return an absolute path as str""" return os.path.abspath(normpath(*args))
[docs] def mkdir(path: str | Path) -> str: """Ensure that `path` exists. If necessary, directories are recursively created to make `path` available. The argument is interpolated against the configuration tree. """ if not exists(path := interpolate1(Path(path))): echo("mkdir -p", path) os.makedirs(path) return path
[docs] def rmtree(path: str | Path) -> None: """Recursively remove `path` and everything it contains. Can also remove single files. The argument is interpolated against the configuration tree. Obviously, this should be used with care. """ if (path := interpolate1(path)) and not exists(path): return if sys.platform == "win32": echo(f"rm {path} -recurse -force") else: echo(f"rm -rf {path}") if (path := Path(path)).is_dir(): path.rmtree() else: path.remove()
[docs] def mv(source: str | Path, target: str | Path) -> None: """Move a file or directory recursively from `source` to `target` in case the `target` exists, otherwise rename `source` to `target`. """ if not exists((source := str(interpolate1(source)))): die(f"{source} does not exist!") target = str(interpolate1(target)) if sys.platform == "win32": echo(f"move-item -path {source} -destination {target}") else: echo(f"mv {source} {target}") shutil.move(source, target)
[docs] def copy(source: str | Path, target: str | Path) -> None: """Copy a file or directory recursively from `source` to `target` in case the `target` exists. """ if not exists((source := Path(interpolate1(source)).absolute())): die(f"{source} does not exist!") target = Path(interpolate1(target)).absolute() source_is_dir = source.is_dir() if sys.platform == "win32": opts = "-recurse" if source_is_dir else "" echo(f"copy-item -path {source} -destination {target} {opts}") else: opts = "-r " if source_is_dir else "" echo(f"cp {opts}{source} {target}") if source_is_dir: source.copytree( (target / source.basename()).mkdir_p(), dirs_exist_ok=True, ) else: source.copy2(target)
[docs] def die(*msg: Any, resolve: bool = True) -> None: """Terminates ``spin`` with a non-zero return code and print the error message `msg`. Arguments are interpolated against the configuration tree if `resolve` evaluates to `True`. """ if resolve: msg = interpolate(msg) # type: ignore[assignment] error(*msg, resolve=False) raise click.Abort(msg)
[docs] class Command: """Create a function that is a shrink-wrapped shell command. The callable returned behaves like :py:func:`sh`, accepting additional arguments for the wrapper command as positional parameters. All positional arguments are interpolated against the configuration tree. Example: >>> install = Command("pip", "install") >>> install("spin") """ def __init__(self: Command, *cmd: str) -> None: self._cmd = list(cmd) def append(self: Command, item: str) -> None: self._cmd.append(item) def __call__( self: Command, *args: str, **kwargs: Any ) -> subprocess.CompletedProcess | None: cmd = self._cmd + list(args) return sh(*cmd, **kwargs)
[docs] def sh(*cmd: Any, **kwargs: Any) -> subprocess.CompletedProcess | None: """Run a program by building a command line from `cmd`. When multiple positional arguments are given, each is treated as one element of the command. When just one positional argument is used, `sh` assumes it to be a single command and splits it into multiple arguments using :py:func:`shlex.split`. The `cmd` arguments are interpolated against the configuration tree. When `silent` is ``False``, the resulting command line will be echoed. When `shell` is ``True``, the command line is passed to the system's shell. Other keyword arguments are passed into :py:func:`subprocess.run`. All positional arguments are interpolated against the configuration tree. >>> sh("ls", "{HOME}") """ cmd = interpolate(cmd) # type: ignore[assignment] shell = kwargs.pop("shell", len(cmd) == 1) check = kwargs.pop("check", True) env = argenv = kwargs.pop("env", None) if env: process_env = dict(os.environ) process_env.update(env) env = process_env executable = None if sys.platform == "win32": if len(cmd) == 1: cmd = shlex.split(cmd[0].replace("\\", "\\\\")) if not shell: executable = shutil.which(cmd[0]) if not kwargs.pop("silent", False): def quote(arg: str) -> str: if len(cmd) > 1 and " " in arg: return f"'{arg}'" return arg echo(" ".join(quote(c) for c in cmd)) message = "Command '{cmd_}' failed with exit status {returncode}." cmd_ = cmd if isinstance(cmd, str) else subprocess.list2cmdline(cmd) # type: ignore[unreachable] # noqa: E501 try: debug( f"subprocess.run({cmd}, shell={shell}, check={check}, env={argenv}," f" executable={executable}, kwargs={kwargs})", ) cpi = subprocess.run( cmd, shell=shell, check=check, env=env, executable=executable, **kwargs ) except FileNotFoundError as ex: debug(format_exc()) die(str(ex)) except subprocess.CalledProcessError as ex: debug(format_exc()) if check: die(message.format(cmd_=cmd_, returncode=ex.returncode)) cpi = subprocess.CompletedProcess(args=cmd, returncode=ex.returncode) if not check and cpi.returncode: warn(message.format(cmd_=cmd, returncode=cpi.returncode)) return cpi
def backtick(*cmd: str, **kwargs: Any) -> str: kwargs["stdout"] = subprocess.PIPE cpi = sh(*cmd, **kwargs) return cpi.stdout.decode() # type: ignore[no-any-return,union-attr] #: EXPORTS is a list that contains all (key, value) tuples of environment variables #: that got set or unset via :py:func:`csspin.setenv` during the current spin execution. #: #: The ``value`` of a given element is already fully interpolated, except for #: parts that look like environment variables. So any plugin using ``EXPORTS`` is able #: to lazily evaluate the value of a variable in cases, where it has been set #: multiple times. #: #: A case where that's relevant can be seen in the following example: #: #: Example: #: #: >>> os.environ.getenv("PATH") #: "/usr/bin:/bin" #: >>> setenv(PATH="{spin.project_root}/bin:{PATH}") #: >>> setenv(PATH="{python.scriptdir}:{PATH}") #: >>> EXPORTS #: [("PATH", "/home/foo/project/bin:{PATH}"), ("PATH", "/home/foo/project/.spin/venv/bin:{PATH})] #: #: As can be seen, the real value of ``PATH`` should be #: ``"/home/foo/project/.spin/venv/bin:/home/foo/project/bin:"/usr/bin:/bin"``, #: which any plugin could now generate. EXPORTS: list[tuple[str, str]] = []
[docs] def setenv(*args: Any, **kwargs: Any) -> None: """Set or unset one or more environment variables. The values of keyword arguments are interpolated against the configuration tree. Passing ``None`` as a value removes the environment variable. Variables that have been set during or before ``configure()`` will be patched into the activation scripts of the Python virtual environment. On Windows, all passed environment variable keys will be set in upper case. >>> setenv(FOO="{foo.bar}", BAZ="some-value") """ def _value_replacement_for_echoing(value: str) -> str: keys = re.findall(r"{(?P<key>\w+?)}", value) if sys.platform == "win32": for key in keys: if key in os.environ: value = value.replace(f"{{{key}}}", f"$env:{key}") else: for key in keys: if key in os.environ: value = value.replace(f"{{{key}}}", f"${key}") return value for key, value in kwargs.items(): if sys.platform == "win32": key = key.upper() if value is None: if not args: if sys.platform == "win32": echo(f"$env:{key}=$null", resolve=False) else: echo(f"unset {key}", resolve=False) os.environ.pop(key, None) EXPORTS.append((key, "")) else: interpolated_value = interpolate1(value) exports_value = interpolate1(value, interpolate_environ=False) if not args: value_to_print = _value_replacement_for_echoing(exports_value) if sys.platform == "win32": echo(f'$env:{key}="{value_to_print}"', resolve=False) else: echo(f"export {key}={value_to_print}", resolve=False) else: echo(args[0]) os.environ[key] = interpolated_value EXPORTS.append((key, exports_value))
def _read_file(fn: str | Path, mode: str) -> str | bytes: fn = interpolate1(fn) with open(fn, mode, encoding="utf-8" if "b" not in mode else None) as f: return f.read() # type: ignore[no-any-return]
[docs] def readlines(fn: str | Path) -> list[str]: fn = interpolate1(fn) with open(fn, "r", encoding="utf-8") as f: return f.readlines()
[docs] def writelines(fn: str | Path, lines: str) -> None: fn = interpolate1(fn) with open(fn, "w", encoding="utf-8") as f: f.writelines(lines)
def _write_file(fn: str | Path, mode: str, data: bytes | str) -> int: fn = interpolate1(fn) with open(fn, mode, encoding="utf-8" if "b" not in mode else None) as f: return f.write(data)
[docs] def readbytes(fn: str | Path) -> bytes: """`readbytes` reads binary data. The file name argument is interpolated against the configuration tree. """ return _read_file(fn, "rb") # type: ignore[return-value]
[docs] def writebytes(fn: str | Path, data: bytes) -> int: """Write `data`` to the file named `fn`. Data is binary data (`bytes`). The file name argument is interpolated against the configuration tree. """ return _write_file(fn, "wb", data)
[docs] def readtext(fn: str | Path) -> str: """Read an UTF8 encoded text from the file 'fn'. The file name argument is interpolated against the configuration tree. """ return _read_file(fn, "r") # type: ignore[return-value]
[docs] def writetext(fn: str | Path, data: str) -> int: """Write `data`, which is text (Unicode object of type `str`) to the file named `fn`. The file name argument is interpolated against the configuration tree. """ return _write_file(fn, "w", data)
[docs] def appendtext(fn: str | Path, data: str) -> int: """Append `data`, which is text (Unicode object of type `str`) to the file named `fn`. The file name argument is interpolated against the configuration tree. """ return _write_file(fn, "a", data)
[docs] def persist(fn: str | Path, data: Type[object]) -> int: """Persist the Python object(s) in `data` using :py:mod:`pickle`.""" return writebytes(fn, pickle.dumps(data))
[docs] def unpersist(fn: str, default: Any | None = None) -> Any | None: """Load pickled Python object(s) from the file `fn`.""" try: return pickle.loads(readbytes(fn)) except FileNotFoundError: return default
[docs] class Memoizer: """Maintain a persistent base of simple facts. Facts are loaded from file `fn`. The argument is interpolated against the configuration tree. If `fn` does not exist, there are no facts. The `Memoizer` class stores and retrieves Python objects from the binary file named `fn`. The argument is interpolated against the configuration tree. `Memoizer` can be used to keep a simple "database". Spin internally uses Memoizers for e.g. keeping track of packages installed in a virtual environment. To ease the handling in `spin` scripts, there also is context manager called `memoizer` (note the lower case "m"). The context manager retrieves the database from the file and saves it back when the context is closed:: >>> with memoizer(fn) as m: ... if m.check("test"): ... There are *no* precautions for simultaneous access from multiple processes, writes will likely silently become lost. """ def __init__(self: Memoizer, fn: str | Path) -> None: self._fn = fn self._items = unpersist(fn, []) def check(self: Memoizer, item: Iterable) -> bool: """Checks whether `item` is stored in the memoizer.""" return item in self._items # type: ignore[operator] def clear(self: Memoizer) -> None: """Remove all items""" self._items = [] def items(self: Memoizer) -> Iterable: return self._items # type: ignore[return-value] def add(self: Memoizer, item: Any) -> None: """Add `item` to the memoizer.""" self._items.append(item) # type: ignore[union-attr] self.save() def save(self: Memoizer) -> None: """Persist the current state of the memoizer. This is done automatically when using `memoizer` as a context manager. """ persist(self._fn, self._items) # type: ignore[arg-type]
[docs] @contextmanager def memoizer(fn: str) -> Generator: """Context manager for creating a :py:class:`Memoizer` that automatically saves the fact base. >>> with memoizer("facts.memo") as m: ... m.add("fact1") ... m.add("fact2") """ m = Memoizer(fn) yield m m.save()
NSSTACK = []
[docs] @contextmanager def namespaces(*nslist: dict) -> Generator: """Add namespaces for interpolation.""" for ns in nslist: NSSTACK.append(ns) yield for _ in nslist: NSSTACK.pop()
# Here we set the defaults for spin's configuration and data # directories for diverse platforms, partially using the # `platformdirs` library. # # For macOS we use the Linux XDG defaults, instead of what # platformdirs provides by default, which is ~/Library/Application # Support/, unsuitable for command line applications like spin. See # https://github.com/cslab/csspin-python/issues/1 and the discussion # at https://code.contact.de/qs/spin/cs.spin/-/merge_requests/76 # # On Windows platformdirs.user_config_dir and # platformdirs.user_data_dir both point to %LOCALAPPDATA%, that's why # we need /data and /config subfolders def _user_config_and_data_dir() -> tuple[str, str]: """Return base config/data dirs; enforce XDG layout on macOS to avoid spaces.""" if sys.platform == "darwin": pfdirs = platformdirs.unix.Unix() return pfdirs.user_config_dir, pfdirs.user_data_dir return platformdirs.user_config_dir(), platformdirs.user_data_dir() USER_CONFIG_DIR, USER_DATA_DIR = _user_config_and_data_dir() os.environ["SPIN_CONFIG"] = os.environ.get( "SPIN_CONFIG", Path.joinpath( USER_CONFIG_DIR, "spin", "config" if sys.platform == "win32" else "", ).normpath(), ) os.environ["SPIN_DATA"] = os.environ.get( "SPIN_DATA", Path.joinpath( USER_DATA_DIR, "spin", "data" if sys.platform == "win32" else "" ).normpath(), )
[docs] def interpolate1( literal: str | Path, *extra_dicts: dict, interpolate_environ: bool = True ) -> str | Path: """ Interpolate a string or path against the configuration tree and the environment. Example: >>> interpolate1("{SHELL}") '/usr/bin/zsh' If literal is not a string or path, it will be converted to a string prior interpolating. To avoid interpolation for literals or specific parts of a literal, curly braces can be used to escape curly braces, like regular f-string interpolation. Example: >>> interpolate1( ... '{{"header": {{"language": "en", "data": "{SPIN_DATA}"}}}}' ... ) '{"header": {"language": "en", "data": "/home/developer/.local/share/spin"}}' It may be necessary to omit the interpolation against the environment, in that case the parameter ``interpolate_environ`` can be set to ``False``. Example: >>> interpolate1("{spin.version} and {PATH}", interpolate_environ=False) "1.0.2.dev5 and {PATH}" .. Attention:: **Do not use** :py:func:`csspin.interpolate1` **in a plugins' top-level**, as the one can't rely on the configuration tree at import time of the module. .. code-block:: python :caption: Negative example: How not to use :py:func:`csspin.interpolate1` :linenos: from csspin import config, interpolate1 defaults = config(key=interpolate1("{some.property}")) .. Attention:: If the interpolated property is not set, not NoneType but "None" as a string is returned. """ is_path = isinstance(literal, Path) literal = str(literal) seen = set() previous = None where_to_look = collections.ChainMap( {"config": CONFIG}, CONFIG, os.environ if interpolate_environ else {}, *extra_dicts, *NSSTACK, ) while previous != literal: # Interpolate until we reach a fixpoint -- this allows for # nested variables. if literal in seen: die( f"Could not interpolate '{literal}' due to RecursionError.", resolve=False, ) seen.add(previous := literal) # We need to protect double braces by doubling them, because # .format() converts {{}} to {} undependent of if it interpolated # something within these braces or not. literal = literal.replace("}}", "}}}}").replace("{{", "{{{{") try: if interpolate_environ: literal = literal.format_map(where_to_look) else: # When not interpolating the environ, we need to escape # sub-literals that look like environment variables. literal = re.sub(r"({\w+})", r"{\1}", literal) literal = literal.format_map(where_to_look) literal = re.sub(r"{({\w+})}", r"\1", literal) except KeyError as ex: error_key = str(ex)[1:-1] die(f"Cannot interpolate '{{{error_key}}}' in {literal}.", resolve=False) except AttributeError as ex: error_key = str(ex).replace("No property ", "")[1:-1] die(f"Cannot interpolate '{{{error_key}}}' in {literal}.", resolve=False) literal = literal.replace("{{", "{").replace("}}", "}") return Path(literal).normpath() if is_path else literal
[docs] def interpolate(literals: Iterable, *extra_dicts: dict) -> list: """ Interpolate an iterable of hashable items against the configuration tree. """ out = [] for literal in literals: # We allow None, which gets filtered out here, to enable # simple argument configuration, e.g. something like: # sh("...", "-q" if cfg.quiet else None, ...) if literal is not None: out.append(interpolate1(literal, *extra_dicts)) return out
[docs] def config(*args: Any | None, **kwargs: Any) -> ConfigTree: """`config` creates a configuration subtree: >>> config(a="alpha", b="beta) {"a": "alpha", "b": "beta} Plugins use `config` to declare their ``defaults`` tree. """ from csspin.tree import ConfigTree return ConfigTree(*args, **kwargs, __ofs_frames__=1) # type: ignore[arg-type]
[docs] def readyaml(fname: str | Path) -> ConfigTree: """Read a YAML file.""" from csspin.tree import tree_load fname = interpolate1(fname) return tree_load(fname)
[docs] def download(url: str, location: str | Path, headers: dict | None = None) -> None: """Download data from ``url`` to ``location`` using optional ``headers``.""" url, location = interpolate((url, location)) dirname = os.path.dirname(location) mkdir(dirname) echo(f"Download {url} -> {location} ...") download_headers = { "User-Agent": ( f"csspin/v{importlib.metadata.version('csspin')}" " (https://github.com/cslab/csspin)" ) } if headers: download_headers.update(headers) request = urllib.request.Request(url, headers=download_headers) with urllib.request.urlopen(request) as response: data = response.read() writebytes(location, data)
def extract(archive: str | Path, extract_to: str | Path, member: str = "") -> None: """ Unpack ``archive`` into ``extract_to``, optionally filtering by ``member`` prefix. """ echo(f"Extracting {archive} to {extract_to}") member = str(member).replace("\\", "/") mode = None extractor = None if tarfile.is_tarfile(archive): extractor = tarfile.open if str(archive).endswith(".tar.gz") or str(archive).endswith(".tgz"): mode = "r:gz" elif str(archive).endswith(".tar.xz"): mode = "r:xz" elif zipfile.is_zipfile(archive): extractor = zipfile.ZipFile # type: ignore[assignment] mode = "r" if not mode or extractor is None: die(f"Unsupported archive type {archive}") with extractor(archive, mode=mode) as arc: # type: ignore[misc,arg-type] if isinstance(arc, tarfile.TarFile): members = ( entity for entity in arc.getmembers() if entity.name.startswith(member) ) elif isinstance(arc, zipfile.ZipFile): members = (entity for entity in arc.namelist() if entity.startswith(member)) # type: ignore[misc] else: members = () # type: ignore[assignment] arc.extractall( members=members, # type: ignore[arg-type] path=extract_to, ) # nosec: tarfile_unsafe_members # This is the global configuration tree. CONFIG = config()
[docs] def get_tree() -> ConfigTree: """Return the global configuration tree.""" return CONFIG
def set_tree(cfg: ConfigTree) -> ConfigTree: # Intentionally undocumented global CONFIG # pylint: disable=global-statement CONFIG = cfg return cfg
[docs] def argument(**kwargs: Any) -> Callable: """Annotations task arguments. This works just like :py:func:`click.argument`, accepting all the same parameters. Example: .. code-block:: python :linenos: @task() def mytask(outfile: argument(type="...", help="...")): foo("do something") """ def wrapper(param_name: str) -> Callable: return click.argument(param_name, **kwargs) return wrapper
[docs] def option(*args: Any, **kwargs: Any) -> Callable: """Annotations for task options. This works just like :py:func:`click.option`, accepting the same parameters. Example: .. code-block:: python :linenos: @task() def mytask( outfile: option( "-o", "outfile", default="-", type=click.File("w"), help="... usage information ...", ), ): foo("do something") """ def wrapper(param_name: str) -> Callable: return click.option(*args, **kwargs) return wrapper
[docs] def task(*args: Any, **kwargs: Any) -> Callable: """Decorator that creates a task. This is a wrapper around Click's :py:func:`click.command` decorator, with some extras: * a string keyword argument ``when`` adds the task to the list of commands to run using :py:func:`invoke` * `aliases` is a list of aliases for the command (e.g. "tests" is an alias for "test") * ``noenv=True`` registers the command as a global command, that can run without a provisioned environment `task` introspects the signature of the decorated function and handles certain argument names automatically: * ``ctx`` will pass the :py:class:`Click context object <click.Context>` into the task; this is rarely useful for spin tasks * ``cfg`` will automatically pass the configuration tree; this is very useful most of the time, except for the simplest of tasks * ``args`` will simply pass through all command line arguments by using the ``ignore_unknown_options`` and ``allow_extra_args`` options of the Click context; this is often used for tasks that launch a specific command line tool to enable arbitrary arguments All other arguments to the task must be annotated with either :py:func:`option` or :py:func:`argument`. They both support the same arguments as the corresponding decorators :py:func:`click.option` and :py:func:`click.argument`. A simple example: .. code-block:: python :linenos: @task() def simple_task(cfg, args): foo("do something") This would make ``simple_task`` available as a new subcommand of spin. More elaborate examples can be found in the built-in plugins shipping with spin. """ # Import cli here, to avoid an import cycle from csspin import cli # pylint: disable=cyclic-import def task_wrapper( fn: Callable, group: GroupWithAliases = cli.commands, # type: ignore[assignment] ) -> Callable: task_object = fn pass_context = False context_settings = config() sig = inspect.signature(fn) param_names = list(sig.parameters.keys()) if param_names and "ctx" in param_names: pass_context = True task_object = click.pass_context(fn) param_names.pop(param_names.index("ctx")) pass_config = False for pn in param_names: if pn == "cfg": pass_config = True continue if pn == "args": context_settings.ignore_unknown_options = True context_settings.allow_extra_args = True task_object = click.argument("args", nargs=-1)(task_object) continue param = sig.parameters[pn] task_object = param.annotation(pn)(task_object) hook = kwargs.pop("when", None) aliases = kwargs.pop("aliases", []) noenv = kwargs.pop("noenv", False) group = kwargs.pop("group", group) task_object = group.command(*args, **kwargs, context_settings=context_settings)( task_object ) if noenv: cli.register_noenv(task_object.name) if group != cli.commands: # pylint: disable=comparison-with-callable task_object.full_name = " ".join((group.name, task_object.name)) # type: ignore[attr-defined] else: task_object.full_name = task_object.name # type: ignore[attr-defined] if hook: cfg = get_tree() hook_tree = cfg.spin.get("hooks", config()) hooks = hook_tree.setdefault(hook, []) hooks.append(task_object) for alias in aliases: group.register_alias(alias, task_object) def regular_callback(*args: Any, **kwargs: Any) -> Any: ensure(task_object) # type: ignore[arg-type] return fn(*args, **kwargs) def alternate_callback(*args: Any, **kwargs: Any) -> Any: ensure(task_object) # type: ignore[arg-type] return fn(get_tree(), *args, **kwargs) if pass_config and pass_context: task_object.callback = click.pass_context(alternate_callback) elif pass_config: task_object.callback = alternate_callback elif pass_context: task_object.callback = click.pass_context(regular_callback) else: task_object.callback = regular_callback task_object.__doc__ = fn.__doc__ return task_object return task_wrapper
[docs] def group(*args: Any, **kwargs: Any) -> Callable: """Decorator for task groups, to create nested commands. This works like :py:class:`click.Group`, but additionally supports subcommand aliases, that can be set via the `aliases` keyword argument to :py:func:`task`. Example: .. code-block:: python @group() def foo(): pass @foo.task() def bar(): pass The above example creates a ``spin foo bar`` command. """ from csspin import cli def group_decorator(fn: str | Path) -> Callable: noenv = kwargs.pop("noenv", False) kwargs["cls"] = cli.GroupWithAliases grp = cli.commands.group(*args, **kwargs)(click.pass_context(fn)) # type: ignore[attr-defined] if noenv: cli.register_noenv(grp.name) def subtask(*args: Any, **kwargs: Any) -> Callable: def task_decorator(fn: str | Path) -> click.Command: cmd = task(*args, **kwargs, group=grp)(fn) return cmd # type: ignore[no-any-return] return task_decorator grp.task = subtask return grp # type: ignore[no-any-return] return group_decorator
[docs] def getmtime(fn: str | Path) -> float: """Get the modification of file `fn`. `fn` is interpolated against the configuration tree. """ return os.path.getmtime(interpolate1(fn))
def is_up_to_date(target: str | Path, sources: Iterable[str | Path]) -> bool: """Check whether `target` exists and is newer than all of the `sources`. """ if not exists(target): return False if not isinstance(sources, Iterable): die( # type: ignore[unreachable] f"Can't check if {target} is up to date, since 'sources' is not iterable." ) target_mtime = getmtime(target) source_mtimes = [getmtime(src) for src in sources] + [0.0] return target_mtime >= max(source_mtimes) def run_script(script: str | list, env: dict | None = None) -> None: """Run a list of shell commands.""" if isinstance(script, str) or not isinstance(script, Iterable): script = [str(script)] for line in script: sh(line, shell=True, env=env) def run_spin(script: str | list) -> None: """Run a list of spin commands.""" from csspin.cli import commands if isinstance(script, str) or not isinstance(script, Iterable): script = [str(script)] for line in script: line = shlex.split(line.replace("\\", "\\\\")) try: echo("spin", " ".join(line), resolve=True) commands(line) except SystemExit as exc: if exc.code: # pylint: disable=using-constant-test raise def get_sources(tree: ConfigTree) -> list: sources = tree.get("sources", []) if not isinstance(sources, list): sources = [sources] return sources # type: ignore[no-any-return] def build_target(cfg: ConfigTree, target: str, phony: bool = False) -> None: info(f"target '{target}'{' (phony)' if phony else ''}") if (target_def := cfg.build_rules.get(target, None)) is None: if not exists(target) and not phony: die( f"Sorry, I don't know how to produce '{target}'. You may want to" " add a rule to your spinfile.yaml in the 'build_rules'" " section." ) return sources = get_sources(target_def) # First, build preconditions if sources: for source in sources: build_target(cfg, source, False) if not phony: if not is_up_to_date(target, sources): info(f"build '{target}'") script = target_def.get("script", []) spinscript = target_def.get("spin", []) run_script(script) run_spin(spinscript) else: info(f"{target} is up to date") def ensure(command: click.Command) -> None: # Check 'command_name' for dependencies declared under # 'build_rules', and make sure to produce it. This is used # internally and intentionally undocumented. debug(f"checking preconditions for {command}") cfg = get_tree() build_target(cfg, f"task {command.full_name}", phony=True) # type: ignore[attr-defined]
[docs] def invoke(hook: str, *args: Any, **kwargs: Any) -> None: '''``invoke()`` invokes the tasks that have the ``when`` hook `hook`. As an example, here is the implementation of **test**: .. code-block:: python @task(aliases=["tests"]) def test(cfg, coverage: option("--coverage", "coverage", is_flag=True)): """Run all tests defined in this project""" invoke("test", coverage=coverage) The way a task that uses `invoke` is invoking other tasks is part of the call interface contract: *all* tasks initialized like ``@task(when="test")`` *must* support the ``coverage`` argument as part of their Python function signature (albeit not necessarily the same command line flag ``--coverage``). ''' ctx = click.get_current_context() cfg = get_tree() if not (hooks := cfg.spin.hooks.setdefault(hook, [])): warn(f"No tasks found for hook '{hook}'") return n_hooks = len(hooks) info( f"{hook} hook will invoke the following tasks: " + ", ".join([f"'{h.full_name}'" for h in hooks]) ) for i, task_object in enumerate(hooks): prefix = f"{hook} ({i + 1}/{n_hooks}) -" echo(f"{prefix} calling '{task_object.full_name}'") # Filter kwargs so that plugins don't need to provide # options, just for being able to get called by a workflow. task_opts = [ param.name for param in task_object.params if isinstance(param, click.Option) ] pass_opts = {k: v for k, v in kwargs.items() if k in task_opts} ctx.invoke(task_object, *args, **pass_opts) info(f"{prefix} '{task_object.full_name}' done")
[docs] def toporun(cfg: ConfigTree, *fn_names: Any, reverse: bool = False) -> None: """Run plugin functions named in 'fn_names' in topological order.""" plugins = cfg.spin.topo_plugins if reverse: plugins = reversed(plugins) for func_name in fn_names: debug(f"toporun: {func_name}") for pi_name in plugins: if pi_name == "csspin.builtin" and func_name in ("cleanup", "provision"): # Don't run the hook in spin.builtin, it's a task there and not # considered a plugin's hook. continue pi_mod = cfg.loaded[pi_name] initf = getattr(pi_mod, func_name, None) if initf: debug(f" {pi_name}.{func_name}()") initf(cfg)
def main(*args: Any, **kwargs: Any) -> None: from csspin.cli import cli if not args: args = None # type: ignore[assignment] cli.main(args, **kwargs) # type: ignore[arg-type] def _main(*args: Any, **kwargs: Any) -> None: return main(*args, standalone_mode=True, **kwargs) def parse_version(verstr: str) -> packaging.version.Version: """Parse a version string.""" return packaging.version.parse(verstr) def get_requires(tree: ConfigTree, keyname: str) -> ConfigTree | list: """Access the 'requires.<keyname>' property in a subtree. Return [] if not there. """ requires = tree.get("requires", config()) return requires.get(keyname, []) # type: ignore[no-any-return]