Source code for zoti_gen.builder

import importlib
import types
from collections import OrderedDict
from copy import deepcopy
from typing import Dict, List, Set
import logging as log

import marshmallow as mm
from zoti_yaml import Module, get_pos

import zoti_gen.core as ty
import zoti_gen.util as util
from zoti_gen.core import Block, Label, Requirement, Template
from zoti_gen.exceptions import ModelError, ParseError, ValidationError


[docs]class Builder: """This handler takes care of loading input specifications, templates, building and dumping target code artifacts. :arg main: the qualified name of the main module (containing a ``top`` entry in preamble which points to the project's top block) :arg srcs: a list of initialized `zoti_yaml.Module <../zoti-yaml>`_ containing all the raw input specifications. Used for their qualified name queries. :arg annotations: `formatted string <https://www.w3schools.com/python/ref_string_format.asp>`_ for printing :class:`Block` information (before, after) the template expansion. In the formatting, the variable ``comp`` represents the current :class:`Block` object. """ _mods: Dict _blks: Dict main: ty.Ref """ Constructed path to the top (i.e., main) block """ requs: Requirement """Resolved dependencies. Available only after calling :meth:`resolve()`.""" decls: List """ List of component declared at top level. Available only after calling :meth:`resolve()`.""" def __init__(self, main: str, srcs: List[Module], annotation=(None, None)): # self.main = main self._annot_begin, self._annot_end = annotation self._annot_begin = f"\n{self._annot_begin}\n" if self._annot_begin else "\n" self._annot_end = f"\n{self._annot_end}\n" if self._annot_end else "\n" if not srcs: raise ImportError("No input sources provided.") modules = {mod.name: mod for mod in srcs} self._mods = {} for nm, mod in modules.items(): self._mods[nm] = {bl[ty.ATTR_NAME]: bl for bl in mod.doc[ty.ATTR_BLOCK]} # print(list(self._mods[nm].keys())) # print(list([b["name"] for b in mod.doc[ty.ATTR_BLOCK]])) if len(self._mods[nm].keys()) != len(mod.doc[ty.ATTR_BLOCK]): raise ImportError(f"Module '{nm}' contains duplicate names.") self._blks = {} assert "top" in modules[main].preamble self.main = ty.Ref(module=main, name=modules[main].preamble["top"]) self.requs = Requirement({}) self.decls = []
[docs] def get(self, ref=None, caller=None) -> Block: """Gets a :class:`Block` object using its qualified name. If the the block has been parsed before it returns the previously-constructed block, otherwise it follows the decision flow: | it searches the specifications | if it refers to a library template | | it imports the base constructor using `importlib <https://docs.python.org/3/library/importlib.html>`_ | else | | uses :class:`Block` base constructor | parses the specifications and constructs the block """ def _get_spec(module, name): spec = importlib.util.find_spec(module) if spec is None: raise ImportError(f"Cannot find module '{module}'") # m = spec.loader.load_module(module) # DEPRECATED! m = types.ModuleType(spec.loader.name) spec.loader.exec_module(m) if m is None: raise ImportError(f"Cannot load module '{module}'") if name not in vars(m): raise ImportError( f"Spec for '{name}' not found in module '{module}'") return vars(m)[name] if ref is None: return Block("_DUMMY", code="", prototype=None) if ref in self._blks: return self._blks[ref] if ref.module in self._mods: if ref.name not in self._mods[ref.module]: msg = f"Block '{ref.name}' not found in module '{ref.module}'" raise ParseError(msg, caller) comp_src = self._mods[ref.module][ref.name] needs_spec = ty.ATTR_TYPE in comp_src if needs_spec: spec_ref = ty.RefSchema().load(comp_src[ty.ATTR_TYPE]) info = comp_src if get_pos(comp_src) else caller else: comp_src = {"name": ref.name} needs_spec = True spec_ref, info = ref, caller try: if needs_spec: spec = _get_spec(spec_ref.module, spec_ref.name) comp = spec.Schema(unknown=mm.EXCLUDE).load(comp_src) # if storing it might overwrite wrongly self._blks[ref] = comp else: comp = Block.Schema().load(comp_src) self._blks[ref] = comp except mm.ValidationError as err: raise ValidationError(err.messages, info) if comp._info is None: comp._info = get_pos(caller) return comp
[docs] def parse(self): """Recursively parses a loaded project (i.e., containing input specifications) and creates the (hidden) internal representation starting from the main block inwards. """ log.info(f"*** Loading all components related to '{self.main}' ***") def _recursive(ref, caller): comp = self.get(ref, caller) log.info(f" - Loaded '{ref}'") if not comp.instance: return for inst in comp.instance: _recursive(inst.block, caller=inst) _recursive(self.main, caller=None)
[docs] def resolve(self): """Resolves names/bindings and renders code. OBS: alters the internal structure of each respective block entry.""" log.info(f"*** Resolving bindings and expanding templates ***") def _check_attr(obj, *attrlist): missing = [attr for attr in attrlist if getattr(obj, attr) is None] if missing: msg = f"{type(obj).__name__} is missing attribute(s): {missing}" name = getattr(obj, "name") if hasattr( obj, "name") else util.qualname(obj) raise ModelError(msg, name, get_pos(obj)) def _self_check(comp, context): if callable(getattr(comp, ty.FUN_CHECK, None)): try: getattr(comp, ty.FUN_CHECK)() except Exception as e: msg = "Self-validation failed" msg += f" with:\n{e}" if e else "" raise ModelError(msg, comp.name, context) def _map_bindings(instance, labels, params): newlabelb = {} newparamb = {} def label_to_label(parent, child, usage, info=None): b_label = deepcopy(labels[parent]) b_label.name = usage.render( # CONTEXT-BEGIN: bind/label_to_label p=parent, # ID of parent label=labels, # entire namespace of resolved labels param=params, # entire namespace of parameters # CONTEXT-END: bind/label_to_label info=info ) newlabelb[child] = b_label def param_to_label(parent, child, info=None): b_label = Label(name=params[parent], usage=params[parent], glue={}) b_label.name = params[parent] newlabelb[child] = b_label def usage_to_label(child, usage, info=None): b_label = Label(name=usage.render( # CONTEXT-BEGIN: bind/usage_to_label label=labels, # entire namespace of resolved labels param=params, # entire namespace of parameters # CONTEXT-END: bind/usage_to_label ), usage=usage, glue={}) newlabelb[child] = b_label def param_to_param(parent, child, **kwargs): newparamb[child] = params[parent] def value_to_param(child, value, **kwargs): newparamb[child] = value for bind in instance.bind: try: locals()[bind.func](**bind.args, info=get_pos(bind)) except Exception as e: raise ModelError(e, obj=bind) return newlabelb, newparamb def _recursive_inst(inst, comp, b_labels, b_params, namespace): if ty.PRAGMA_EXP in inst.directive: log.info(f" - Expanding instance {inst.placeholder}...") _check_attr(inst, ty.ATTR_PH) _recursive_blks(comp, b_labels, b_params, namespace) _check_attr(comp, ty.ATTR_CODE) if inst.usage: log.info(f" - Creating call code for {inst.placeholder}") comp.code = inst.usage.render( # CONTEXT-BEGIN: instance/usage-expand name=comp.name, # component's name label=comp.label, # resolved labels param=comp.param, # parameters placeholder={ # only one placeholder "code": comp.code }, # CONTEXT-END: instance/usage-expand info=get_pos(comp), ) return comp.code elif inst.block not in self.decls or ty.PRAGMA_NEW in inst.directive: # print("=============================") # if self.decls: # print(type(self.decls[0])) # print(comp.name, inst.block, self.decls) # print("=============================") log.info(f" - Making new block for instance ...") labels = b_labels if ty.PRAGMA_PASS in inst.directive else {} _recursive_blks(comp, labels, b_params, set(self.decls)) self.decls.append(inst.block) _check_attr(comp, ty.ATTR_CODE, ty.ATTR_PROTO) comp.code = comp.prototype.render( # CONTEXT-BEGIN: prototype name=comp.name, # component's name label=comp.label, # resolved labels param=comp.param, # parameters placeholder={ # only one placeholder "code": comp.code }, # CONTEXT-END: prototype info=get_pos(comp), ) if inst.placeholder: log.info(f" - Created call code for {inst.placeholder}") _check_attr(inst, ty.ATTR_USAGE) return inst.usage.render( # CONTEXT-BEGIN: instance/usage-noexpand name=comp.name, # component's name param=comp.param, # parameters label=b_labels, # bounded (parent) labels # CONTEXT-END: instance/usage-noexpand info=get_pos(comp), ) else: return None else: log.info(f" - Creating call code for {inst.placeholder}") return inst.usage.render( name=comp.name, # component's name label=b_labels, # bounded (parent) labels param=comp.param, # parameters info=get_pos(comp), ) def _recursive_blks(comp, b_labels, b_params, namespace: Set[str]): name = util.uniqueName(comp.name, namespace, update=True) log.info(f" ** Resolving component '{name}'") comp.name = name for key, label in comp.label.items(): label.name = util.uniqueName(key, namespace, update=True) log.info(f" - Updated label names for {list(comp.label.keys())}") params = {**comp.param, **b_params} log.info(f" - Updated params {list(params.keys())}") labels = OrderedDict({**comp.label, **b_labels}) for key, label in labels.items(): if key not in b_labels: _check_attr(label, ty.ATTR_USAGE) label.name = label.usage.render( # CONTEXT-BEGIN: label/usage p=key, # ID of the current label label=labels, # complete namespace of resolved labels param=params, # complete namespace of parameters # CONTEXT-END: label/usage info=get_pos(label), ) log.info( f" - Updated parent label bindings {list(labels.keys())}") comp.param = params comp.label = labels rinst = {} if comp.instance: for inst in comp.instance: rcomp = self.get(inst.block, caller=comp) blabels, bparams = _map_bindings( inst, comp.label, comp.param) rinst[inst.placeholder] = _recursive_inst( inst, rcomp, blabels, bparams, namespace ) _self_check(comp, get_pos(comp)) if comp.requirement is not None: self.requs.update(comp.requirement) log.info(f" - Updated global requirements") if isinstance(comp.code, Template): code = self._annot_begin.format(comp=comp) code += comp.code.render( # CONTEXT-BEGIN: code label=comp.label, # resolved labels param=comp.param, # parameters placeholder=rinst, # recursivly-expanded block instances # CONTEXT-END: code info=get_pos(comp), ) code += self._annot_end.format(comp=comp) comp.code = code log.info(f" - rendered code") main = self.get(self.main) _recursive_blks(main, {}, {}, set()) try: main.code = main.prototype.render( # CONTEXT-BEGIN: code name=main.name, label=main.label, # resolved labels param=main.param, # parameters placeholder={ "code": main.code }, # CONTEXT-END: code info=get_pos(main), ) except Exception as e: raise ModelError(e, "main", obj=main) self.decls.append(self.main)