corelay.processor.flow
A module that contains basic flow operation processors, such as Shaper,
Sequential and Parallel.
Classes
The abstract base class for groups of processors. |
|
A processor group that is invoking its children in parallel. |
|
A processor group that invokes its children in sequence, feeding the input the first child, and then each output to the next child. |
|
Extracts and/or copies by indices. |
- class corelay.processor.flow.Shaper[source]
Bases:
ProcessorExtracts and/or copies by indices.
- Parameters:
is_output (bool) – A value indicating whether this
Shaperprocessor is the output of aPipeline. Defaults toFalse.is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to
False.io (Storable | None) – An IO object that is used to cache intermediate results of the
Pipeline, which can then be re-used in this run or in subsequent runs of thePipeline. Defaults to an instance ofNoStorage.indices (tuple[int | tuple[int], ...]) – The indices to copy/extract. The resulting output will be a tuple with the same member shape. Each index may be passed an arbitrary amount of times. Outer tuples allow integers and tuples, inner tuples only allow integers.
Examples
>>> Shaper(indices=(0, 1, (0, 1, 2)))(['a', 'b', 'c']) ('a', 'b', ('a', 'b', 'c'))
- indices: Annotated[tuple[int | tuple[int], ...], Param]
The indices to copy/extract. The resulting output will be a tuple with the same member shape. Each index may be passed an arbitrary amount of times. Outer tuples allow integers and tuples, inner tuples only allow integers.
- function(data: Any) Any[source]
Extracts and/or copies indices of data.
- Parameters:
data (Any) – The data from which the elements, identified by the indices, are to be extracted. This can be any object, but if it is not an iterable, index 0 corresponds to the object itself.
- Raises:
TypeError – An invalid index was accessed in the data.
- Returns:
Returns the extracted/copied elements of the data, identified by the indices. The output is a tuple with the same member shape as the indices.
- Return type:
- __tracked__: collections.OrderedDict[str, Any]
An
collections.OrderedDictwith all public class attributes, i.e., all class attributes not enclosed with double underscores.
- class corelay.processor.flow.GroupProcessor[source]
Bases:
ProcessorThe abstract base class for groups of processors.
- Parameters:
is_output (bool) – A value indicating whether this
GroupProcessorprocessor is the output of aPipeline. Defaults toFalse.is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to
False.io (Storable | None) – An IO object that is used to cache intermediate results of the
Pipeline, which can then be re-used in this run or in subsequent runs of thePipeline. Defaults to an instance ofNoStorage.children (Iterable[Processor]) – The children of the group. This is a list of processors that will be called in parallel or sequentially.
- children: Annotated[Iterable[Processor], Param]
The children of the group. This is a list of processors that will be called in parallel or sequentially.
- __tracked__: collections.OrderedDict[str, Any]
An
collections.OrderedDictwith all public class attributes, i.e., all class attributes not enclosed with double underscores.
- class corelay.processor.flow.Parallel[source]
Bases:
GroupProcessorA processor group that is invoking its children in parallel.
Note
Please note, that the child processors are not executed in parallel in the sense of multiprocessing, but that the children all either receive the same input data or an element of the input data, in contrast to the
Sequentialprocessor group, which first executes the first child and then feeds the output to the next child.- Parameters:
is_output (bool) – A value indicating whether this
Parallelgroup processor is the output of aPipeline. Defaults toFalse.is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to
False.io (Storable | None) – An IO object that is used to cache intermediate results of the
Pipeline, which can then be re-used in this run or in subsequent runs of thePipeline. Defaults to an instance ofNoStorage.children (Iterable[Processor]) – The children of the group. This is a list of processors that will be called in parallel.
broadcast (bool) – A value indicating whether the input data should be broadcasted to all children. If
True, the input data will be copied as many times as there are children. IfFalseand the input is an iterable, the elements of the iterable will be passed to the children one by one. Defaults toFalse.
Examples
>>> Parallel(children=[FunctionProcessor(processing_function=lambda x: x**n) for n in (1, 2, 3, 4)])((2, 2, 2, 2)) [2, 4, 8, 16] >>> Parallel(children=[FunctionProcessor(processing_function=lambda x: x**n) for n in (1, 2, 3, 4)])(2) [2, 4, 8, 16]
- broadcast: Annotated[bool, Param]
A value indicating whether the input data should be broadcasted to all children. If
True, the input data will be copied as many times as there are children. IfFalseand the input is an iterable, the elements of the iterable will be passed to the children one by one. Defaults toFalse.
- __tracked__: collections.OrderedDict[str, Any]
An
collections.OrderedDictwith all public class attributes, i.e., all class attributes not enclosed with double underscores.
- function(data: Any) Any[source]
Invokes the children in parallel, passing the input data to each child. If
broadcastisTrue, the input data will be copied as many times as there are children. IfbroadcastisFalseand the input is an iterable, the elements of the iterable will be passed to the children one by one.- Parameters:
data (Any) – The input data to pass to the children. If
broadcastisTrue, this can be any object. IfbroadcastisFalse, and the input is an iterable, the elements of the iterable will be passed to the children one by one.- Raises:
TypeError – The
broadcastparameter is set toTrue, and the number of children and number of data elements mismatch.- Returns:
Returns a tuple that has the same number of elements as there are children and contains the outputs of the child processors.
- Return type:
- class corelay.processor.flow.Sequential[source]
Bases:
GroupProcessorA processor group that invokes its children in sequence, feeding the input the first child, and then each output to the next child.
- Parameters:
is_output (bool) – A value indicating whether this
Sequentialgroup processor is the output of aPipeline. Defaults toFalse.is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to
False.io (Storable | None) – An IO object that is used to cache intermediate results of the
Pipeline, which can then be re-used in this run or in subsequent runs of thePipeline. Defaults to an instance ofNoStorage.children (Iterable[Processor]) – The children of the group. This is a list of processors that will be called in sequentially.
Examples
>>> Sequential(children=[FunctionProcessor(processing_function=lambda x: c + x) for c in 'abcd'])('=') 'dcba='
- __tracked__: collections.OrderedDict[str, Any]
An
collections.OrderedDictwith all public class attributes, i.e., all class attributes not enclosed with double underscores.