corelay.processor.flow

A module that contains basic flow operation processors, such as Shaper, Sequential and Parallel.

Classes

GroupProcessor

The abstract base class for groups of processors.

Parallel

A processor group that is invoking its children in parallel.

Sequential

A processor group that invokes its children in sequence, feeding the input the first child, and then each output to the next child.

Shaper

Extracts and/or copies by indices.

class corelay.processor.flow.Shaper[source]

Bases: Processor

Extracts and/or copies by indices.

Parameters:
  • is_output (bool) – A value indicating whether this Shaper processor is the output of a Pipeline. Defaults to False.

  • is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to False.

  • io (Storable | None) – An IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline. Defaults to an instance of NoStorage.

  • indices (tuple[int | tuple[int], ...]) – The indices to copy/extract. The resulting output will be a tuple with the same member shape. Each index may be passed an arbitrary amount of times. Outer tuples allow integers and tuples, inner tuples only allow integers.

Examples

>>> Shaper(indices=(0, 1, (0, 1, 2)))(['a', 'b', 'c'])
('a', 'b', ('a', 'b', 'c'))
indices: Annotated[tuple[int | tuple[int], ...], Param]

The indices to copy/extract. The resulting output will be a tuple with the same member shape. Each index may be passed an arbitrary amount of times. Outer tuples allow integers and tuples, inner tuples only allow integers.

Parameters:
Return type:

Plug

function(data: Any) Any[source]

Extracts and/or copies indices of data.

Parameters:

data (Any) – The data from which the elements, identified by the indices, are to be extracted. This can be any object, but if it is not an iterable, index 0 corresponds to the object itself.

Raises:

TypeError – An invalid index was accessed in the data.

Returns:

Returns the extracted/copied elements of the data, identified by the indices. The output is a tuple with the same member shape as the indices.

Return type:

Any

__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

class corelay.processor.flow.GroupProcessor[source]

Bases: Processor

The abstract base class for groups of processors.

Parameters:
  • is_output (bool) – A value indicating whether this GroupProcessor processor is the output of a Pipeline. Defaults to False.

  • is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to False.

  • io (Storable | None) – An IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline. Defaults to an instance of NoStorage.

  • children (Iterable[Processor]) – The children of the group. This is a list of processors that will be called in parallel or sequentially.

children: Annotated[Iterable[Processor], Param]

The children of the group. This is a list of processors that will be called in parallel or sequentially.

Parameters:
Return type:

Plug

__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

class corelay.processor.flow.Parallel[source]

Bases: GroupProcessor

A processor group that is invoking its children in parallel.

Note

Please note, that the child processors are not executed in parallel in the sense of multiprocessing, but that the children all either receive the same input data or an element of the input data, in contrast to the Sequential processor group, which first executes the first child and then feeds the output to the next child.

Parameters:
  • is_output (bool) – A value indicating whether this Parallel group processor is the output of a Pipeline. Defaults to False.

  • is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to False.

  • io (Storable | None) – An IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline. Defaults to an instance of NoStorage.

  • children (Iterable[Processor]) – The children of the group. This is a list of processors that will be called in parallel.

  • broadcast (bool) – A value indicating whether the input data should be broadcasted to all children. If True, the input data will be copied as many times as there are children. If False and the input is an iterable, the elements of the iterable will be passed to the children one by one. Defaults to False.

Examples

>>> Parallel(children=[FunctionProcessor(processing_function=lambda x: x**n) for n in (1, 2, 3, 4)])((2, 2, 2, 2))
[2, 4, 8, 16]
>>> Parallel(children=[FunctionProcessor(processing_function=lambda x: x**n) for n in (1, 2, 3, 4)])(2)
[2, 4, 8, 16]
broadcast: Annotated[bool, Param]

A value indicating whether the input data should be broadcasted to all children. If True, the input data will be copied as many times as there are children. If False and the input is an iterable, the elements of the iterable will be passed to the children one by one. Defaults to False.

Parameters:
Return type:

Plug

__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

function(data: Any) Any[source]

Invokes the children in parallel, passing the input data to each child. If broadcast is True, the input data will be copied as many times as there are children. If broadcast is False and the input is an iterable, the elements of the iterable will be passed to the children one by one.

Parameters:

data (Any) – The input data to pass to the children. If broadcast is True, this can be any object. If broadcast is False, and the input is an iterable, the elements of the iterable will be passed to the children one by one.

Raises:

TypeError – The broadcast parameter is set to True, and the number of children and number of data elements mismatch.

Returns:

Returns a tuple that has the same number of elements as there are children and contains the outputs of the child processors.

Return type:

Any

class corelay.processor.flow.Sequential[source]

Bases: GroupProcessor

A processor group that invokes its children in sequence, feeding the input the first child, and then each output to the next child.

Parameters:
  • is_output (bool) – A value indicating whether this Sequential group processor is the output of a Pipeline. Defaults to False.

  • is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to False.

  • io (Storable | None) – An IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline. Defaults to an instance of NoStorage.

  • children (Iterable[Processor]) – The children of the group. This is a list of processors that will be called in sequentially.

Examples

>>> Sequential(children=[FunctionProcessor(processing_function=lambda x: c + x) for c in 'abcd'])('=')
'dcba='
__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

function(data: Any) Any[source]

Invokes the child processors of the processor group in sequence. The input data is fed to the first child, whose output is then fed into the second child, and so on.

Parameters:

data (Any) – The input data to pass to the first child.

Returns:

Returns the output of the last child processor.

Return type:

Any