Internal API

lb.cache

Defines an abstract class for cache providers.

class lb.cache.CacheProvider[source]
__abstractmethods__ = frozenset({'get', 'set'})
__module__ = 'lb.cache'
_abc_cache = <_weakrefset.WeakSet object>
_abc_negative_cache = <_weakrefset.WeakSet object>
_abc_negative_cache_version = 43
_abc_registry = <_weakrefset.WeakSet object>
get(key)[source]

Gets the value associated to key in the cache. Raises KeyError if the key is not found.

set(key, value)[source]

Adds the pair (key, value) to the cache.

lb.graph

This module provides facilities to build and run a topology graph out of a YAML file.

class lb.graph.Connector(block_from, value_from, block_dest, value_dest)[source]

Represents a connection in a block.

A connection links an output block and one of its return value (a field name of the returned dict) with an input block and one of its input value (a parameter name).

__dict__ = mappingproxy({'__repr__': <function Connector.__repr__>, '__weakref__': <attribute '__weakref__' of 'Connector' objects>, '__init__': <function Connector.__init__>, '__doc__': '\n Represents a connection in a block.\n\n A connection links an output block and one of its return value (a\n field name of the returned dict) with an input block and one of\n its input value (a parameter name).\n ', '__module__': 'lb.graph', '__dict__': <attribute '__dict__' of 'Connector' objects>})
__init__(block_from, value_from, block_dest, value_dest)[source]
__module__ = 'lb.graph'
__repr__()[source]
__weakref__

list of weak references to the object (if defined)

class lb.graph.Graph(filename=None, filecontent=None, registry=None, skip_check=False)[source]

Builds, stores, checks and executes a DAG from a YAML topology file.

__dict__ = mappingproxy({'loop_vertices': <function Graph.loop_vertices>, '__doc__': '\n Builds, stores, checks and executes a DAG from a YAML topology\n file.\n ', '_check_dag_no_loops': <function Graph._check_dag_no_loops>, '__module__': 'lb.graph', 'before_graph_execution': <function Graph.before_graph_execution>, 'after_block_execution': <function Graph.after_block_execution>, '__weakref__': <attribute '__weakref__' of 'Graph' objects>, '__init__': <function Graph.__init__>, '__dict__': <attribute '__dict__' of 'Graph' objects>, '_check_yaml': <function Graph._check_yaml>, '_register_modules': <function Graph._register_modules>, '_parse_file': <function Graph._parse_file>, 'execute': <function Graph.execute>, '_build_dag': <function Graph._build_dag>, '_check_dag_inputs': <function Graph._check_dag_inputs>, 'after_graph_execution': <function Graph.after_graph_execution>, 'before_block_execution': <function Graph.before_block_execution>})
__init__(filename=None, filecontent=None, registry=None, skip_check=False)[source]

Initializes a DAG for execution, provided a YAML file containing it, and a blocks registry.

skip_check allows to skip the checks: it is usually not recommended, but can come handy when e.g. working with subgraphs and avoiding to check partial graphs.

__module__ = 'lb.graph'
__weakref__

list of weak references to the object (if defined)

_build_dag()[source]

Creates the DAG associated to the YAML file, recursively merging the subgraphs it encounters.

_check_dag_inputs()[source]

Checks that every output has the same name and type as the inputs where it’s consumed.

_check_dag_no_loops()[source]

Checks that the DAG doesn’t contain loops.

_check_yaml()[source]

Checks that the YAML file is correctly typed.

_parse_file()[source]

Parses a YAML file defining a DAG.

_register_modules()[source]

Initiates the registry with the modules defined in the yaml file.

after_block_execution(block, results)[source]
after_graph_execution(results)[source]
before_block_execution(block, results)[source]
before_graph_execution()[source]
execute()[source]

Executes a DAG, beginning with all its entry points and giving their outputs to their consumers, iteratively.

loop_vertices(fun, depth=False)[source]

Implements a breadth-first search on the DAG, and applies function fun to every vertice.

fun must accept one argument, a list of vertices representing the path taken. The last element of this list is the current vertice.

If depth is True, do a depth-first search instead.

class lb.graph.Topology(fields, registry)[source]

Container for a topology, as defined in YAML. Mainly used to encapsulate a Graph.

__init__(fields, registry)[source]
__module__ = 'lb.graph'
get_outbound(value)[source]

Given a bind_out value, returns the associated block along with its wanted result.

topology_inputs()[source]

Returns the topology_inputs ($inputs.*) of the encapsulated graph.

vertices()[source]

Returns the vertices of the encapsulated graph.

class lb.graph.Vertice(fields)[source]

Container for a vertice, a block as defined in YAML. Also holds its directed edges with other blocks.

__init__(fields)[source]
__module__ = 'lb.graph'
add_next(connector)[source]
add_prev(connector)[source]
class lb.graph._Section(fields)[source]

Container for a section of the YAML file, containing its fields.

__dict__ = mappingproxy({'__doc__': '\n Container for a section of the YAML file, containing its fields.\n ', '__weakref__': <attribute '__weakref__' of '_Section' objects>, '__module__': 'lb.graph', '__dict__': <attribute '__dict__' of '_Section' objects>, '__init__': <function _Section.__init__>})
__init__(fields)[source]
__module__ = 'lb.graph'
__weakref__

list of weak references to the object (if defined)

lb.graph.type_or_any(type_)[source]

Returns the type if it is not inspect._empty (undeclared type), typing.Any otherwise.

lb.exceptions

Defines some exceptions used throughout the framework.

exception lb.exceptions.BlockError[source]

Error in a block definition.

__module__ = 'lb.exceptions'
__weakref__

list of weak references to the object (if defined)

exception lb.exceptions.ExecutionError[source]

Error thrown while executing a topology.

__module__ = 'lb.exceptions'
__weakref__

list of weak references to the object (if defined)

exception lb.exceptions.NotBoundError[source]

When a subtopology value is accessed, but has not been defined (bound).

__module__ = 'lb.exceptions'
__weakref__

list of weak references to the object (if defined)

exception lb.exceptions.UnfoundModuleError[source]

A blocks module was added to the import list but doesn’t exist.

__module__ = 'lb.exceptions'
__weakref__

list of weak references to the object (if defined)

exception lb.exceptions.YAMLError[source]

An error in a user-defined YAML topology.

__module__ = 'lb.exceptions'
__weakref__

list of weak references to the object (if defined)

lb.log

lb.log.get_logger(name)[source]

lb.plugins_manager

lb.plugins_manager._register_hook(f, hookname)[source]
lb.plugins_manager.after_block_execution(f)[source]
lb.plugins_manager.after_graph_execution(f)[source]
lb.plugins_manager.available_plugins()[source]

List plugins available from lb/plugins/.

lb.plugins_manager.before_block_execution(f)[source]
lb.plugins_manager.before_graph_execution(f)[source]
lb.plugins_manager.import_plugins(plugins)[source]

Imports the activated plugins, which will trigger their hooks registration.

lb.registry

This module manages the blocks registry. It provides a decorator for blocks to register themselves, and a Registry class to manage registered blocks.

class lb.registry.Registry[source]

A registry contains a list of registered blocks, along with their inferred properties, such as their parameters, metadata, inputs and output.

__dict__ = mappingproxy({'_register_block': <function Registry._register_block>, '__getitem__': <function Registry.__getitem__>, '__doc__': '\n A registry contains a list of registered blocks, along with their\n inferred properties, such as their parameters, metadata, inputs\n and output.\n ', '__module__': 'lb.registry', 'add_module': <function Registry.add_module>, '__weakref__': <attribute '__weakref__' of 'Registry' objects>, '__init__': <function Registry.__init__>, 'items': <function Registry.items>, 'keys': <function Registry.keys>, '__dict__': <attribute '__dict__' of 'Registry' objects>})
__getitem__(block_name)[source]

Returns the registered block block_name.

__init__()[source]

Inits a registry.

__module__ = 'lb.registry'
__weakref__

list of weak references to the object (if defined)

_register_block(func)[source]

Registers a Python function as a block. It must follow block requirements.

add_module(module)[source]

Adds all blocks of a module in the registry.

items()[source]

Iterates through the registered blocks. Throw a list of tuples (block_name, block_properties).

keys()[source]

Returns the list of registered block names.

lb.registry.block(**kwargs)[source]

Decorator to define a block, so it can get added to the registry. Use it as follows: @block(engine=’foo’, description=’bar’, my_other_metadata=’foobar’) def my_block()…

lb.signature

Signature is an algorithm which permits to uniquely identify a particular block (instance) in a particular graph.

For that purpose, we hash the block name, the block parameters, and the signatures of the block inputs (recursively). This is similar to a Merkle tree, in that a change in one of the block predecessors will raise a different block signature.

Let H the signature algorithm, and h a cryptographically secure hash function.

B is a block, whose inputs are:
  • right: bar.result
  • left: foo.result
and args are:
  • abc: 123
  • cba: ['x', 'y']
H(block) = h([
  block.blockname,                 <--- the *block* name, not the *instance* name
  [
    ('abc', 123),                  <--+ ordered by arg name
    ('cba', ['x', 'y'])            <--|
  ],
  [
    ('left', H(foo), 'result'),    <--+ ordered by input name
    ('right', H(bar), 'result')    <--|
  ]
])
lb.signature.sign(block)[source]

Returns a unique signature for a block instance within a graph, and keeps it cached to speed-up the recursive aspect.

lb.types

Types manipulation. This module defines functions to check for types compatibility.

lb.types.is_instance(var, type_)[source]

Checks if the type of a variable is compatible with another type.

Currently it is only a wrapper around is_subtype, but we might add different type-checking later, e.g. with the types returned by the yaml parser.

lb.types.is_sig_compatible(left, right)[source]

Checks if signature left is compatible with signature right, i.e. if the types in signature left can be fed to a function accepting the types in signature right.

left and right are tuples of types (tuple as in Python built-in, not as in typing.Tuple)

lb.types.is_subtype(left, right)[source]

Checks if left is a subtype of right, i.e. if they are compatible. Currently implemented: basic types, Tuple, List, Callable.

lb.types.type_of_mapping_values(type_)[source]

Given a Mapping type, returns the type of its values.

lb.utils

Misc. utilities.

lb.utils.ReturnEntry(**kwargs)[source]
lb.utils.default_function(n: int, value: typing.Any = None)[source]

Creates a dummy default function to provide as default value when a func parameter is expected. n is the number of parameters expected. value is the default value returned by the function