from itertools import chain
from zoti_graph.core import Rel, ATTR_REL
import zoti_graph.genny as ty
def _flatten(n, G):
entry = G.entry(n)
if isinstance(entry, ty.BasicNode):
return []
if not isinstance(entry, ty.CompositeNode):
return [n]
return list(chain.from_iterable([_flatten(c, G) for c in G.children(n)]))
def _non_composite_children(n, G):
return list(chain.from_iterable([_flatten(c, G) for c in G.children(n)]))
def _non_composite_parent(n, G):
parent = n.parent()
assert n != parent
if isinstance(G.entry(parent), ty.CompositeNode):
return _non_composite_parent(parent, G)
return parent
[docs]def edge_direction(G, u, v):
"""Edge direction should be consistent with the hierarchy of its connecting elements.
forall u,v in edges(G)
| u is basic and v is not basic ⇒ v.kind ∈ {IN}
| v is basic and u is not basic ⇒ u.kind ∈ {OUT}
| node(u) and node(v) are siblings ⇒ u.dir ∈ {OUT, SIDE} & v.dir ∈ {IN, SIDE}
| node(u) is parent for node(v) ⇒ u.dir = v.dir ∈ {IN, INOUT}
| node(u) is child for node(v) ⇒ u.dir = v.dir ∈ {OUT, INOUT}
"""
port_u, port_v = (G.entry(u), G.entry(v))
if isinstance(port_u, ty.BasicNode) and not isinstance(port_v, ty.BasicNode):
assert port_v.has_dir_in() # in
return
if isinstance(port_v, ty.BasicNode) and not isinstance(port_u, ty.BasicNode):
# assert port_u.kind == ty.Dir.OUT # out
assert port_u.has_dir_out() # out
return
node_u, node_v = (u.parent(), v.parent())
if node_u == node_v:
# same node, means that the ports are short-circuited
assert port_u.has_dir_in()
assert port_v.has_dir_out()
elif node_u.parent() == node_v.parent():
# same parent means different directions
assert port_u.has_dir_out() # out, side
assert port_v.has_dir_in() # in, side
elif node_u < node_v:
# different parent means same directions
assert port_u.has_dir_in()
assert port_v.has_dir_in()
elif node_u > node_v:
# different parent means same directions
assert port_u.has_dir_out() # out, inout
assert port_v.has_dir_out() # out, inout
[docs]def edge_hierarchy(G, u, v):
"""Edges can only connect sibling nodes or child nodes to their parents.
forall u,v in edges(G)
| node(u) and node(v) have different parents ⇒
node(u) = node(v).parent ⊻ node(v) = node(u).parent
"""
node_u, node_v = (u.parent(), v.parent())
if node_u.parent() != node_v.parent():
# if not the same parent, then they are parent-child
# same rule is true for intrinsics as well
assert (node_u == node_v.parent()) ^ (node_u.parent() == node_v)
[docs]def edge_sibling_kind(G, u, v):
"""Interconnected sibling nodes (except intrinsics and ignoring the
hierarchy of composites) are of the same kind.
forall u,v in edges(G) where node(u), node(v) are not intrinsic
| node(u) and node(v) have same parent ⇒
flatten(node(u)) ⋃ flatten(node(v)) are all of the same kind
"""
if isinstance(G.entry(u), ty.BasicNode):
return
if isinstance(G.entry(v), ty.BasicNode):
return
node_u, node_v = (u.parent(), v.parent())
if node_u.parent() == node_v.parent():
flattened = _flatten(node_u, G) + _flatten(node_v, G)
if flattened:
first_type = type(G.entry(flattened[0])).__name__
for n in flattened[1:]:
curr_type = type(G.entry(n)).__name__
assert first_type == curr_type
[docs]def node_consistent_tree(G, n):
"""All nodes except for the root node have only one parent.
forall n in nodes(G) - root(G):
| ⇒ len(n.parents) = 1
"""
if n == G.root:
return
parents = [
u for u, v in G.ir.in_edges(n)
if G.ir[u][v][ATTR_REL] & Rel.TREE
]
assert len(parents) == 1
[docs]def node_actor_hierarchy(G, n):
"""Ignoring the hierarchy of composites, all actor nodes belong to a
platform node and contain only kernel nodes.
forall n in nodes(G) where n is actor node:
| ⇒ the first parent which is not composite is platform node
| ⇒ all first children which are not composite are only intrinsic or kernel nodes
"""
entry = G.entry(n)
if not isinstance(entry, ty.ActorNode):
return
parent = G.entry(_non_composite_parent(n, G))
assert isinstance(parent, ty.PlatformNode)
children = _non_composite_children(n, G)
for child in children:
assert isinstance(G.entry(child), ty.KernelNode)
[docs]def node_kernel_hierarchy(G, n):
"""Ignoring the hierarchy of composites, all kernel nodes belong to an
actor node.
forall n in nodes(G) where n is kernel node:
| ⇒ the immediate parent which is not composite is actor node
| ⇒ no child nodes exist
"""
entry = G.entry(n)
if not isinstance(entry, ty.KernelNode):
return
parent = G.entry(_non_composite_parent(n, G))
assert isinstance(parent, ty.ActorNode)
children = G.children(n)
assert len(children) == 0
def node_actor_consistency(G, n):
# checks name consistency
# checks decoupled scenarios
entry = G.entry(n)
if not isinstance(entry, ty.ActorNode):
return
if entry.detector is None:
return
if entry.detector.preproc:
assert n.withNode(entry.detector.preproc) in G.children(n)
for port in entry.detector.inputs:
assert G.ir.has_node(n.withPath(ty.Uid(port)))
if not entry.detector.states and entry.detector.expr:
assert len(entry.detector.expr) == 1
if entry.detector.states and entry.detector.expr:
for state, expr in entry.detector.expr.items():
assert state in entry.detector.states
if "goto" in expr:
assert expr["goto"] in entry.detector.states
if entry.detector.scenarios is None:
return
for scen in entry.detector.scenarios.values():
assert n.withNode(scen) in G.children(n)
# TODO: scenarios are decoupled using projection
# def port_dangling(G, p):
# """There is no dangling port in the graph.
# forall p in ports(G):
# | node(p) is kernel node ⇒ exists e in edges(G) such that p = e.src or p = e.dst
# | p is inout port ⇒ exists e1, e2 in edges(G) such that p = e1 ∩ e2
# | otherwise ⇒ exists e1, e2 in edges(G) such that p = e1.src and p = e2.dst
# """
# node = G.entry(p.parent())
# port = G.entry(p)
# inedges = G.port_edges(p, which="in")
# outedges = G.port_edges(p, which="out")
# if isinstance(node, ty.KernelNode):
# pass # TODO, handle detector preproc
# # assert len(inedges) > 0 or len(outedges) > 0
# elif isinstance(port, ty.Port) and port.kind == ty.Dir.SIDE:
# assert len(inedges) + len(outedges) > 1
# else:
# assert len(inedges) > 0 and len(outedges) > 0
[docs]def port_dangling(G, p):
"""'Dangling port' means either an event port which is not connected
(is triggered by nothing or triggers nothing) or a side-effect
port which is not exposed at the actor level (that might be
ignored).
"""
if G.entry(p).kind == ty.Dir.SIDE:
assert any([
isinstance(G.entry(G.parent(q)), ty.ActorNode)
for q in G.connected_ports(p)
])
else:
assert all([
isinstance(G.entry(G.parent(q)), ty.KernelNode) or
isinstance(G.entry(q), ty.BasicNode)
for q in G.end_ports(p)
])