Source code for zoti_yaml.core

import logging as log
from copy import deepcopy
from pathlib import Path, PurePosixPath
from pprint import pformat, pprint
from typing import Any, Dict, List, Optional, Union, Generic, TypeVar
from dataclasses import dataclass

import yaml

INFO = "_info"
POS = "_pos"
RESERVED_KWS = [INFO, POS]

ATTR_NAME = "name"
ATTR_PATH = "path"
ATTR_MODULE = "module"
ATTR_IMPORT = "import"
ATTR_ALIAS = "as"

POLICY_UNION = "union"
POLICY_RUNION = "union+replace"
POLICY_INTER = "intersect"
POLICY_RINTER = "replace+intersect"


def clean(node):
    if isinstance(node, list):
        return [clean(n) for n in node]
    if isinstance(node, dict):
        return {
            k: clean(v)
            for k, v in node.items()
            if k not in RESERVED_KWS
        }
    if isinstance(node, MergePolicy):
        node.obj = clean(node.obj)
    return node


###############
## Positions ##
###############


[docs]class Pos: """Container for positional information in a YAML file. Depends on the info from a PyYAML loader.""" line: int """line number""" column: int """column number""" ibegin: int """start index in the text file""" iend: int """end index in the text file""" path: str """location of the source file""" who: Optional[str] """string for bookkeeping the processing pipeline for this node""" def __init__(self, line, column, ibegin=0, iend=-1, path=None, who=None): self.line = line self.column = column self.ibegin = ibegin self.iend = iend self.path = path self.who = who
[docs] @classmethod def from_mark(cls, start_mark: yaml.Mark, end_mark: yaml.Mark, path: str = "<stdio>", who: Optional[str] = None): return cls( start_mark.line, start_mark.column, start_mark.index, end_mark.index, path, who, )
def __repr__(self): rep = f'in "{self.path}", ' rep += f"line {self.line+1}, column {self.column+1}" return rep
[docs] def dump(self) -> List: """Dump content to a JSON list of arguments that can be used to recreate it. """ return [self.line, self.column, self.ibegin, self.iend, self.path, self.who]
[docs] def show(self) -> str: """Pretty print content. Checks `logging.root.level`.""" rep = repr(self) if log.root.level > log.WARN: # silent return rep if Path(self.path).is_file(): try: with open(self.path) as f: if log.root.level < log.WARN: # verbose rep += f" {self.who}:\n" rep += f.read()[self.ibegin: self.iend] else: rep += f" {self.who}:\n " rep += list(f.readlines())[self.line][:-1] rep += f'\n {" " * self.column}^' except IOError: pass return rep
[docs]class PosStack: """A stack containing the history of attached positional informations.""" _stack: List[Pos] def __init__(self, stack): self._stack = stack def __repr__(self): return repr(self._stack[0])
[docs] def show(self): ret = list(reversed(self._stack)) print(ret) return "".join([f"\n {repr(p)}" for p in ret[:-1]]) + f"\n {ret[-1].show()}"
[docs]def attach_pos(node: Dict, pos: Pos, to_head=False) -> None: """Helper function to attach position information to an JSON object""" if isinstance(node, dict): if INFO in node and POS in node[INFO] and node[INFO][POS] and to_head: node[INFO][POS].insert(0, pos.dump()) elif INFO in node and POS in node[INFO] and node[INFO][POS]: node[INFO][POS].append(pos.dump()) elif INFO in node: node[INFO][POS] = [pos.dump()] else: node[INFO] = {POS: [pos.dump()]} elif hasattr(node, INFO): tmp = {INFO: getattr(node, INFO)} attach_pos(tmp) setattr(node, INFO, tmp[INFO]) else: log.debug(f"Could not attach position to object: {repr(node)}")
# def get_pos(node: Dict) -> Optional[Pos]: # """Helper function to retrieve position information from a node, if any.""" # try: # if isinstance(node, dict): # return Pos(*node[INFO][POS][0]) # else: # return Pos(*getattr(node, INFO)[POS][0]) # except Exception: # return None
[docs]def get_pos(node: Dict) -> Optional[PosStack]: """Helper function to retrieve position information from a JSON node, if any.""" try: if isinstance(node, dict): return PosStack([Pos(*p) for p in node[INFO][POS]]) else: return PosStack([Pos(*p) for p in getattr(node, INFO)[POS]]) except Exception: return None
def get_all_pos(node: Dict) -> Optional[List[Pos]]: try: if isinstance(node, dict): return [Pos(*p) for p in node[INFO][POS]] else: return [Pos(*p) for p in getattr(node, INFO)[POS]] except Exception: return None ########## ## !ref ## ########## T = TypeVar('T') class Ref(Generic[T]): """Reference to an element in a module. When resolved, this object contains the qualified name or path to a (unique) element. It may have the following keyword arguments: *module*: (optional, string) A module name or an alias. It resolves to an explicit name. If none is specified it is assumed to be the current module. *path*: (mutually exlusive with *name*, string) A path to an element in the tree (see :class:`zoti_yaml.core.TreePath`). *name*: (mutually exlusive with *path*, string) An identifier of the element in a certain module. The type of the reference depends on which tool in the downstream uses it. If the tool works with, e.g. JSON trees (similar to ZOTI-YAML) then specifying *path* is more reasonable as it is being constructed into a :class:`zoti_yaml.core.TreePath` handler. However, if the tool works with other type of element identifiers, *name* should be used, as it preserves the string for downstream manipulation. Example: .. code-block:: yaml module: Foo import: {module: Foo.Bar, as: Baz} --- root: - ref1: !ref {path: ../../ref2} - ref2: !ref {module: Baz, name: "egg"} resolves to: .. code-block:: yaml module: Foo import: {module: Foo.Bar, as: Baz} --- root: - ref1: {module: Foo, path: ../../ref2} - ref2: {module: Foo.Bar, name: "egg"} """ module: Optional[str] path: T def __init__(self, module=None, path=None, name=None): if not (bool(path) != bool(name)): msg = "Either 'path' or 'name' needs to be provided, but not both." raise TypeError(msg) self.module = module if name: self.path = name if path: self.path = TreePath(path) def __repr__(self): return f"{self.module if self.module else ''}.{self.path}" def resolve(self, this=None, root=None): if self.module is None: self.module = this if root is not None and isinstance(self.path, TreePath): self.path.resolve(root) log.info(" - reference resolved: %s", str(self))
[docs]class TreePath: """Simple structure used to reference subtrees/nodes using a sytax similar to:: {/path/to/node} The ``{path/to/node}`` part is stored as a PurePosixPath, thus it can also be a relative path. """ path: PurePosixPath def __init__( self, path: Union[str, PurePosixPath] = ""): if not isinstance(path, PurePosixPath): path = PurePosixPath(path) self.path = path self.is_resolved = False def __repr__(self): return self.path.as_posix()
[docs] def is_relative(self): """Root paths always start with ``/``. If not, then it is relative. """ return self.path.root == ""
[docs] def relative_to(self, root: "TreePath") -> "TreePath": """Returns the path obtained by concatenating this one to a root.""" glob = list(root.path.joinpath(self.path).parts) done = False while not done: for idx, part in enumerate(glob): if part == "..": assert idx > 0 del glob[idx - 1: idx + 1] break if idx == len(glob) - 1: done = True return TreePath(PurePosixPath(*glob))
[docs] def resolve(self, root) -> None: """Resolves this path (see :meth:`relative-to`) and marks it as resolved""" if self.is_resolved: return if root is not None and self.is_relative(): ref = self.relative_to(root) self.path = ref.path self.is_resolved = True
[docs] def with_key(self, key): """appends a key at the end of this path (see :class:`Module`).""" return TreePath(self.path.joinpath(key))
[docs] def with_name(self, name): """appends a name or an index at te end of this path (see :class:`Module`). """ name = name if isinstance(name, str) else str(name) parent = self.path.name return TreePath(self.path.with_name(f"{parent}[{name}]"))
############# ## !attach ## ############# class Attach: """Attaches a referenced node at the current location. This command can have any number of keyword arguments, of which one needs to be *ref*. *ref*: (object) Qualified :class:`zoti_yaml.core.TreePath`-based reference to another node (see ``!ref``). If the *path* - begins with character ``/`` , it is absolute (i.e. relative to the root of the module) - begins with a character other than ``/``, it is relative to this node. *...*: (keyword pairs) Arbitrary entries passed to the attached node in the following conditions: - if the attached node is not an object (i.e. dictionary), these entries are ignored; - if the attached node is an object but does not contain the said entry, it is ignored; - if the attached node is an object and contains the said entry, the argument entry replaces the original one. The original entry is stored at path ``_info/_prev_attrs`` underneath the attached node. *OBS1*: If both the parent and referenced nodes contain positional information, this will be captured in the ``_info/_pos`` entry, whose head will point to the original position of the referenced node. *OBS2*: ``!ref`` commands are resolved before ``!attach``, thus they can be used to construct references. *OBS3*: when exchanging arguments between the caller and callee it is recommended to use a dedicated field (e.g., check the *argfields* argument of :class:`zoti_yaml.handlers.Project`) Example: .. code-block:: yaml module: Foo --- root: !attach ref: {module: Bar, path: /root[bar]/refd} a: this replaces the original b: this is ignored .. code-block:: yaml module: Bar --- root: - name: bar refd: a: this is replaced c: this is carried from the original resolves to: .. code-block:: yaml module: Foo --- root: a: this replaces the original c: this is carried from the original Module ``Bar`` remains unchanged. """ def __init__(self, ref, pos, _info=None, concat=[], **kwargs): if not isinstance(ref, Ref): raise ValueError("value given is not of 'ref' type") self.ref = ref self.pos = pos self.concat = concat self.pos.who = f"{pos.who}:!attach" self.extra = kwargs def __repr__(self): return "!attach " + repr({"ref": self.ref, **self.extra}) def resolve(self, modules): """OBS: Does not add new entries to the attached node!""" if self.ref.module not in modules: raise KeyError(f"Module '{self.ref.module}' not loaded.") refnode = deepcopy(modules[self.ref.module].get( self.ref.path, strict=False)) if isinstance(refnode, list): log.info(" - attached node: %s", repr(self.ref)) return refnode + self.concat if not isinstance(refnode, dict): log.info(" - attached node: %s", repr(self.ref)) return refnode # update metadata of the retreived node attach_pos(refnode, self.pos) refnode[INFO]["_prev_attrs"] = {} # _merge_dict(refnode, node, replace=True) # update values to the one specified in the "!attach" entry # store previous values just in case for k, v in {k: v for k, v in refnode.items() if k in self.extra and k != INFO}.items(): refnode[k] = self.extra[k] refnode[INFO]["_prev_attrs"][k] = v # replace node altogether log.info(" - attached node: %s", repr(self.ref)) return refnode ############## ## !default ## ############## @dataclass class MergePolicy: """Policy dictating how the *defaults* object is to be recursively merged in *originals* (see ``!default``). The current policies are: ``union`` performs the union between *originals* and *defaults* where *originals* have priority if the same key is found. ``union+replace`` performs the union between *originals* and *defaults* where *defaults* have priority if the same key is found. ``intersect`` ignores fields from *defaults* whose keys are not explicitly found in *originals*. *originals* have priority when the same key is found. ``intersect+replace`` ignores fields from *defaults* whose keys are not explicitly found in *originals*. *defaults* have priority when the same key is found. """ obj: Optional[Any] = None union: bool = True replace: bool = False @classmethod def from_keyword(cls, keywd, obj): if keywd == POLICY_UNION: return cls(obj, union=True, replace=False) elif keywd == POLICY_RUNION: return cls(obj, union=True, replace=True) elif keywd == POLICY_INTER: return cls(obj, union=False, replace=False) elif keywd == POLICY_RINTER: return cls(obj, union=False, replace=True) assert False class Default: """This keyword is followed by a list of exactly 2 YAML objects (i.e., trees), *defaults* and *original*. When the document tree is being resolved, it recursivvely fills in the contents of *original* according to the values in *defaults* recursively, based on the active merge policy (see ``!policy:<merge_policy>``). The default merge policy is ``!policy:union``. A policy is a marked YAML node in the *defaults* tree, and is active from that node to all its childred until the last leaf node or until a node with a policy changing marker. E.g.: .. code-block:: yaml !default - !policy:A root: # policy A holds - foo: bar # policy A holds - !policy:B biz: # policy B holds - baz # policy B holds - buzz # policy B holds - bam: blep # policy A holds - root: ... Example: .. code-block:: yaml !default - root: - !policy:intersect foo: a: this is superseded by original b: !policy:union+replace this supersedes the original c: this will be ignored d: !policy:union this is created - bar: this is ignored (only the first element in a list in !defaults matters) - root: - foo: a: this supersedes the default value b: this is superseded by the default value - foo: d: this supersedes the default value resolves to: .. code-block:: yaml root: - foo: a: this supersedes the default value b: this supersedes the original d: this is created - foo: b: this supersedes the original d: this supersedes the default value """ def __init__(self, defaults, original): if type(defaults) != type(original): raise ValueError("Aguments of !default of different type.") self.defaults = defaults self.original = original def __repr__(self): return "!default\n" + pformat([self.defaults, self.original]) def resolve(self): def _merge_dict(orig: Dict, default: Dict, policy: MergePolicy) -> Any: def _merge_val(key, val): if isinstance(val, MergePolicy): new_policy = MergePolicy( union=val.union, replace=val.replace) val = val.obj else: new_policy = policy # log.warn(f"policy={new_policy}") if key in orig: orig[key] = _merge_dict(orig[key], val, new_policy) elif new_policy.union: orig[key] = deepcopy(val) return if type(orig) != type(default): # print(default, orig) msg = f"Cannot merge {type(default).__name__} with {type(orig).__name__}" msg += f"\n {pformat(default)}" msg += f"\n {pformat(orig)}" raise ValueError(msg) if isinstance(orig, dict): for key, val in default.items(): _merge_val(key, val) elif isinstance(orig, list): # if len(default) != 1: # err = f"Length of default list should be 1.\n{pformat(default)}" # raise ValueError(err) orig = [_merge_dict(element, default[0], policy) for element in orig] elif policy.replace or not orig: orig = deepcopy(default) return orig _merge_dict(self.original, clean(self.defaults), policy=MergePolicy()) log.info(" - default values applied") return self.original