corelay.pipeline.base

A module that contains the base classes for pipelines, Pipeline, and tasks of pipelines, Task, which are used to perform a specific set of operations on data.

Classes

Pipeline

The abstract base class for all pipelines.

Task

Represents a single task in a Pipeline.

TaskPlug

A task plug, which ensures that all contained objects are instances of Processor.

class corelay.pipeline.base.TaskPlug[source]

Bases: Plug

A task plug, which ensures that all contained objects are instances of Processor.

__init__(slot: Slot, obj: Processor | Callable[[...], Any] | None = None, default: Processor | Callable[[...], Any] | None = None, **kwargs: Any) None[source]

Initializes a new TaskPlug instance.

Parameters:
  • slot (Slot) – Slot instance to associate with this TaskPlug.

  • obj (Processor | Callable[..., Any] | None) – A Processor held in the TaskPlug container. If not set, default is returned as its value. Defaults to None.

  • default (Processor | Callable[..., Any] | None) – A plug-dependent lower-priority Processor held in the TaskPlug container. If not set, fallback is returned. Defaults to None.

  • **kwargs (Any) – Keyword arguments passed down to the base class constructor, for cooperativity’s sake, which is the next class in the inheritance hierarchy.

Return type:

None

property obj: Processor | None

Gets or sets the Processor contained in the TaskPlug. If the TaskPlug does not contain a Processor, default is retrieved instead.

Returns:

Returns the Processor contained in the TaskPlug. If not set, default is returned.

Return type:

Processor | None

property default: Processor | None

Gets or sets the default Processor of the TaskPlug. If the default Processor is not set, then the fallback Processor is retrieved instead.

Returns:

Returns the default Processor of the TaskPlug. If not set, fallback is returned.

Return type:

Processor | None

class corelay.pipeline.base.Task[source]

Bases: Slot

Represents a single task in a Pipeline. Tasks are slots that ensure all contained objects are plugs and own default values that are instances of Processor.

__init__(proc_type: type[Processor] = Processor, default: Processor | Callable[[...], Any] | None = lambda data: ..., **kwargs: Any) None[source]

Initializes a new Task instance.

Parameters:
  • proc_type (type[Processor]) – The type of Processor allowed for this Task. Defaults to Processor.

  • default (Processor | Callable[..., Any] | None) – The default Processor for the Task, which must either be a Processor or a function. Defaults to the identity function.

  • **kwargs (Any) – Keyword arguments that are passed to the constructor of the class one step up in the class hierarchy, i.e., Slot.

Raises:

TypeError – The allowed Processor type for the Task, proc_type, is not of type Processor or a sub-class of Processor.

Return type:

None

__repr__() str[source]

Returns a str representation of the Task instance.

Returns:

Returns a str representation of the Task instance.

Return type:

str

property default: Processor | None

Gets or sets the default Processor of the Task.

Returns:

Returns the task’s default Processor. If not set, None is returned.

Return type:

Processor | None

__call__(obj: Processor | Callable[[...], Any] | None = None, default: Processor | Callable[[...], Any] | None = None) TaskPlug[source]

Creates a new corresponding TaskPlug container.

Parameters:
Returns:

Returns the newly created TaskPlug container instance, obeying the type and optionality constraints.

Return type:

TaskPlug

class corelay.pipeline.base.Pipeline[source]

Bases: Processor

The abstract base class for all pipelines.

__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

checkpoint_processes() OrderedDict[str, Processor][source]

Finds the Processor that is a checkpoint and is closest to the output. The final checkpoint Processor and all following instances of Processor are retrieved and returned in an collections.OrderedDict.

Raises:

RuntimeError – No checkpoints were defined.

Returns:

Returns an collections.OrderedDict that contains the Processor that is closest to the output and a checkpoint, as well as all following instances of Processor. The instances of Processor in th collections.OrderedDict are ordered in the same way as they were in the instance of Pipeline, i.e., from the checkpoint Processor to the output Processor.

Return type:

collections.OrderedDict[str, Processor]

from_checkpoint() Any[source]

Re-evaluates the Pipeline from the last check-pointed Processor using the output from the checkpoint as input.

Raises:

RuntimeError – If the check-pointed Processor closest to output does not have any checkpoint_data stored, i.e., the Processor has not been called since being declared a checkpoint.

Returns:

Returns the output of Pipeline, starting from check-pointed Processor closest to output.

Return type:

Any

function(data: Any) Any[source]

Propagate data through the whole Pipeline from front to back, calling all Processors in series.

Parameters:

data (Any) – The input data to the Pipeline. This is the input data for the first Processor in the Pipeline. The type of the input data depends on the first Processor in the Pipeline.

Returns:

Returns the output of the Pipeline, which is the output of the of all instances of Processor in the Pipeline that are flagged as outputs. If no instances of Processor are flagged as outputs, the output of the last Processor is returned.

Return type:

Any

__repr__() str[source]

Generates a str representation of the Pipeline, which contains all instances of Processor in the Pipeline and their output types.

Example

>>> MyPipeline()
MyPipeline(
    FunctionProcessor(processing_function=lambda x: x.mean(1),) -> numpy.ndarray
    SciPyPDist(metric=sqeuclidean) -> numpy.ndarray
    RadialBasisFunction(sigma=0.1) -> numpy.ndarray
    MyProcess(stuff=3, func=Param(FunctionType, lambda x: x**2)) -> numpy.ndarray
)
Returns:

Returns a str representation of the Pipeline, which contains all instances of Processor in the Pipeline and their output types.

Return type:

str