# Copyright 2017 The Lambda-blocks developers. See AUTHORS for details.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module provides facilities to build and run a topology graph out
of a YAML file.
"""
from collections import deque
import collections
import inspect
import os.path
import typing
import yaml
import lb.types
from lb.exceptions import NotBoundError, YAMLError, ExecutionError
from lb.log import get_logger
from lb.registry import Registry
from lb.plugins_manager import HOOKS
logger = get_logger(__name__)
[docs]def type_or_any(type_):
"""
Returns the type if it is not inspect._empty (undeclared type),
typing.Any otherwise.
"""
if type_ == inspect._empty:
return typing.Any
return type_
[docs]class _Section(object):
"""
Container for a section of the YAML file, containing its fields.
"""
[docs] def __init__(self, fields):
self.fields = fields
[docs]class Vertice(_Section):
"""
Container for a vertice, a block as defined in YAML. Also holds
its directed edges with other blocks.
"""
[docs] def __init__(self, fields):
super().__init__(fields)
if 'inputs' not in self.fields.keys():
self.fields['inputs'] = {}
if 'args' not in self.fields.keys():
self.fields['args'] = {}
self.prev_vertices = []
self.next_vertices = []
[docs] def add_prev(self, connector):
self.prev_vertices.append(connector)
[docs] def add_next(self, connector):
self.next_vertices.append(connector)
[docs]class Topology(_Section):
"""
Container for a topology, as defined in YAML. Mainly used to
encapsulate a Graph.
"""
[docs] def __init__(self, fields, registry):
super().__init__(fields)
if 'bind_in' not in self.fields.keys():
self.fields['bind_in'] = {}
if 'bind_out' not in self.fields.keys():
self.fields['bind_out'] = {}
# we create the subgraph
self.graph = Graph(filename=self.fields['topology'], skip_check=True, registry=registry)
[docs] def get_outbound(self, value):
"""
Given a bind_out value, returns the associated block along
with its wanted result.
"""
if value not in self.fields['bind_out']:
raise NotBoundError('The value {} is not bound for topology {}.'.format(
value, self.fields['name']))
producer, value = self.fields['bind_out'][value].split('.')
producer = self.graph.vertices[producer]
return producer, value
[docs] def vertices(self):
"""
Returns the vertices of the encapsulated graph.
"""
return self.graph.vertices
[docs]class Connector(object):
"""
Represents a connection in a block.
A connection links an output block and one of its return value (a
field name of the returned dict) with an input block and one of
its input value (a parameter name).
"""
[docs] def __init__(self, block_from, value_from, block_dest, value_dest):
self.block_from = block_from
self.value_from = value_from
self.block_dest = block_dest
self.value_dest = value_dest
[docs] def __repr__(self):
return '{}.{} -> {}.{}'.format(
self.block_from.fields['name'], self.value_from, self.block_dest.fields['name'], self.value_dest)
[docs]class Graph(object):
"""
Builds, stores, checks and executes a DAG from a YAML topology
file.
"""
[docs] def __init__(self, filename=None, filecontent=None, registry=None, skip_check=False):
"""
Initializes a DAG for execution, provided a YAML file
containing it, and a blocks registry.
skip_check allows to skip the checks: it is usually not
recommended, but can come handy when e.g. working with
subgraphs and avoiding to check partial graphs.
"""
assert filename is None or filecontent is None, \
'Either filename or filecontent must be provided, not both.'
assert filename is not None or filecontent is not None, \
'You must provide at least a filename or filecontent when initializing a graph.'
self.filename = filename
self.filecontent = filecontent
self.registry = registry
logger.debug('Parsing YAML data')
self._parse_file()
if self.registry:
logger.debug('Adding modules in the registry')
self._register_modules()
logger.debug('Checking YAML data')
self._check_yaml()
logger.debug('Building grah')
self._build_dag()
if not skip_check:
logger.debug('Checking graph edges and types')
self._check_dag_inputs()
logger.debug('Checking graph has no loops')
self._check_dag_no_loops()
[docs] def _parse_file(self):
"""
Parses a YAML file defining a DAG.
"""
def parse(content):
# Make YAML parser use OrderedDict instead of dict, to
# keep e.g. the order of inputs correct
_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG
def dict_constructor(loader, node):
return collections.OrderedDict(loader.construct_pairs(node))
yaml.add_constructor(_mapping_tag, dict_constructor)
documents = list(yaml.load_all(content))
assert len(documents) == 2, \
'YAML file must contain 2 documents: metadata, and DAG description.'
self.dag_metadata = documents[0] or {}
assert documents[1] != None, \
"The topology doesn't define any block."
# every argument beginning with 'lambda' is considered a
# function and transformed as such
for section in documents[1]:
# happens at least when a file contains only "foo: bar":
assert type(section) is not str, \
'Malformed section: {}'.format(section)
for key, value in section.get('args', {}).items():
if isinstance(value, str):
if value.startswith('lambda'):
section['args'][key] = eval(value)
self.dag_as_yaml = documents[1]
if self.filename is not None:
with open(self.filename) as f:
parse(f)
else:
parse(self.filecontent)
[docs] def _register_modules(self):
"""Initiates the registry with the modules defined in the yaml file.
"""
if 'modules' in self.dag_metadata.keys():
for module in self.dag_metadata['modules']:
self.registry.add_module(module)
[docs] def _build_dag(self):
"""
Creates the DAG associated to the YAML file, recursively
merging the subgraphs it encounters.
"""
def create_edge(block_from, value_from, block_dest, value_dest):
c = Connector(block_from, value_from, block_dest, value_dest)
block_from.add_next(c)
block_dest.add_prev(c)
# assuming all names are unique, even in subgraphs, because
# the primary key is a block name at the moment
vertices = {} # holds the vertices of the graph
subgraphs = {} # holds the subgraphs
topology_inputs = [] # holds the graph global inputs ($inputs)
entry_points = [] # DAG entry points, i.e. vertices without any input
# we create all the vertices and subgraphs
for section in self.dag_as_yaml:
assert 'topology' in section.keys() or 'block' in section.keys(), \
'Malformed section, must be a block or a topology:\n{}' \
.format(section)
if 'topology' in section.keys(): # composition with a sub-topology
subgraphs[section['name']] = Topology(section, self.registry)
elif 'block' in section.keys(): # normal block
vertices[section['name']] = Vertice(section)
# we create the edges
for block_dest in vertices.values():
if len(block_dest.fields['inputs'].items()) == 0:
entry_points.append(block_dest)
continue
for value_dest, pair in block_dest.fields['inputs'].items():
try:
block_from, value_from = pair.split('.')
except ValueError:
raise YAMLError(
'An input must contain a block/topology name and a value, '
'such as my_block.foo. Got {} in block {}'.format(pair, block_dest.fields['name']))
if block_from == '$inputs':
# topology input
topology_inputs.append({'block_dest': block_dest,
'value_dest': value_dest,
'value_from': value_from})
else:
if block_from not in vertices.keys() and block_from not in subgraphs.keys():
raise YAMLError('Block {} has an unknown input: {}={} (no block {})'
.format(block_dest.fields['name'], value_dest, pair, block_from))
if block_from in subgraphs.keys():
# result of a subgraph
block_from, value_from = subgraphs[block_from].get_outbound(value_from)
else:
block_from = vertices[block_from]
create_edge(block_from, value_from, block_dest, value_dest)
# we now merge all the subgraphs
for subgraph in subgraphs.values():
for ti in subgraph.topology_inputs():
block_from, value_from = subgraph.fields['bind_in'][ti['value_from']].split('.')
block_from = vertices[block_from]
create_edge(block_from, value_from, ti['block_dest'], ti['value_dest'])
vertices.update(subgraph.vertices())
# we're done
self.topology_inputs = topology_inputs
self.vertices = vertices
self.entry_points = entry_points
[docs] def _check_yaml(self):
"""
Checks that the YAML file is correctly typed.
"""
def assert_dict(section, key):
if key in section.keys():
assert isinstance(section[key], dict), \
"Block {}'s {} is not a dict".format(section['name'], key)
section_names = []
for section in self.dag_as_yaml:
# be sure every block has a name (unique) and is
# associated to a registered block
assert isinstance(section, dict), \
'Section is malformed: {}'.format(str(section))
assert 'name' in section.keys(), \
"Section doesn't have a name: {}".format(str(section))
assert section['name'] not in section_names, \
'Section name is duplicated: {}'.format(section['name'])
section_names.append(section['name'])
assert 'block' in section.keys() or 'topology' in section.keys(), \
'Section is not a block nor a topology: {}'.format(section)
if 'block' in section.keys(): # regular block
assert section['block'] in self.registry.keys(), \
"Block doesn't exist: {}".format(section['block'])
assert_dict(section, 'inputs')
else: # topology
assert os.path.isfile(section['topology']), \
"Topology file doesn't exist: {}".format(section['topology'])
assert_dict(section, 'bind_in')
assert_dict(section, 'bind_out')
# check arguments types
if 'args' in section.keys():
for name, value in section['args'].items():
expected_type = self.registry[section['block']]['_parameters'][name].annotation
assert lb.types.is_instance(value, type_or_any(expected_type)), \
'Arg {} for block {} is of type {}, expected {}'.format(
name, section['name'], type(value), expected_type)
[docs] def _check_dag_no_loops(self):
"""
Checks that the DAG doesn't contain loops.
"""
def loop_in_path(path):
names = [x.fields['name'] for x in path]
assert len(set(names)) == len(names),\
'There is a loop in your DAG, occuring with the block {}' \
.format(names[-1])
self.loop_vertices(loop_in_path)
[docs] def loop_vertices(self, fun, depth=False):
"""
Implements a breadth-first search on the DAG, and applies
function `fun` to every vertice.
`fun` must accept one argument, a list of vertices
representing the path taken. The last element of this list is
the current vertice.
If `depth` is True, do a depth-first search instead.
"""
# initial paths are [[entrypoint1],[entrypoint2],…]
queue_to_visit = deque([[x] for x in self.entry_points])
while len(queue_to_visit) > 0:
current_path = queue_to_visit.popleft()
current_vertice = current_path[-1]
fun(current_path)
# add linked vertices to the queue
for dest in current_vertice.next_vertices:
if depth:
queue_to_visit.appendleft(current_path + [dest.block_dest])
else:
queue_to_visit.append(current_path + [dest.block_dest])
[docs] def execute(self):
"""
Executes a DAG, beginning with all its entry points and giving
their outputs to their consumers, iteratively.
"""
self.before_graph_execution()
results = {}
fun_queue = deque(self.entry_points)
while len(fun_queue) > 0:
block = fun_queue.popleft()
# do the block inputs have been computed yet?
for input_ in block.prev_vertices:
if input_.block_from not in results.keys():
fun_queue.append(block) # TODO: beware of endless loops loops loop loops…
break
else: # ok, all inputs have been computed, we proceed
comp_fun = self.registry[block.fields['block']]['_func']
comp_args = block.fields['args']
# we prepare the block's input values
comp_inputs = {}
for input_ in block.prev_vertices:
try:
this_res = getattr(results[input_.block_from], input_.value_from)
except AttributeError:
raise ExecutionError('{} was scheduled for execution, but lacks some inputs: {}'
.format(block.fields['name'], input_.value_from))
comp_inputs[input_.value_dest] = this_res
self.before_block_execution(block, results)
if block not in results.keys(): # a plugin could have computed the results
results[block] = comp_fun(**comp_args)(**comp_inputs)
self.after_block_execution(block, results)
# we add this block's destinations to the queue,
# if they are not there already
for destination in block.next_vertices:
if destination.block_dest not in fun_queue:
fun_queue.append(destination.block_dest)
self.after_graph_execution(results)
return results
[docs] def before_graph_execution(self):
for f in HOOKS['before_graph_execution']:
f(self.vertices, self.entry_points)
[docs] def after_graph_execution(self, results):
for f in HOOKS['after_graph_execution']:
f(results)
[docs] def before_block_execution(self, block, results):
for f in HOOKS['before_block_execution']:
f(block, results)
[docs] def after_block_execution(self, block, results):
for f in HOOKS['after_block_execution']:
f(block, results)