corelay.pipeline.base
A module that contains the base classes for pipelines, Pipeline, and tasks of pipelines,
Task, which are used to perform a specific set of operations on data.
Classes
The abstract base class for all pipelines. |
|
Represents a single task in a |
|
A task plug, which ensures that all contained objects are instances of |
- class corelay.pipeline.base.TaskPlug[source]
Bases:
PlugA task plug, which ensures that all contained objects are instances of
Processor.- __init__(slot: Slot, obj: Processor | Callable[[...], Any] | None = None, default: Processor | Callable[[...], Any] | None = None, **kwargs: Any) None[source]
Initializes a new
TaskPluginstance.- Parameters:
slot (Slot) – Slot instance to associate with this
TaskPlug.obj (Processor | Callable[..., Any] | None) – A
Processorheld in theTaskPlugcontainer. If not set,defaultis returned as its value. Defaults toNone.default (Processor | Callable[..., Any] | None) – A plug-dependent lower-priority
Processorheld in theTaskPlugcontainer. If not set,fallbackis returned. Defaults toNone.**kwargs (Any) – Keyword arguments passed down to the base class constructor, for cooperativity’s sake, which is the next class in the inheritance hierarchy.
- Return type:
None
- class corelay.pipeline.base.Task[source]
Bases:
SlotRepresents a single task in a
Pipeline. Tasks are slots that ensure all contained objects are plugs and own default values that are instances ofProcessor.- __init__(proc_type: type[Processor] = Processor, default: Processor | Callable[[...], Any] | None = lambda data: ..., **kwargs: Any) None[source]
Initializes a new
Taskinstance.- Parameters:
proc_type (type[Processor]) – The type of
Processorallowed for thisTask. Defaults toProcessor.default (Processor | Callable[..., Any] | None) – The default
Processorfor theTask, which must either be aProcessoror a function. Defaults to the identity function.**kwargs (Any) – Keyword arguments that are passed to the constructor of the class one step up in the class hierarchy, i.e.,
Slot.
- Raises:
TypeError – The allowed
Processortype for theTask,proc_type, is not of typeProcessoror a sub-class ofProcessor.- Return type:
None
- __call__(obj: Processor | Callable[[...], Any] | None = None, default: Processor | Callable[[...], Any] | None = None) TaskPlug[source]
Creates a new corresponding
TaskPlugcontainer.- Parameters:
- Returns:
Returns the newly created
TaskPlugcontainer instance, obeying the type and optionality constraints.- Return type:
- class corelay.pipeline.base.Pipeline[source]
Bases:
ProcessorThe abstract base class for all pipelines.
- __tracked__: collections.OrderedDict[str, Any]
An
collections.OrderedDictwith all public class attributes, i.e., all class attributes not enclosed with double underscores.
- checkpoint_processes() OrderedDict[str, Processor][source]
Finds the
Processorthat is a checkpoint and is closest to the output. The final checkpointProcessorand all following instances ofProcessorare retrieved and returned in ancollections.OrderedDict.- Raises:
RuntimeError – No checkpoints were defined.
- Returns:
Returns an
collections.OrderedDictthat contains theProcessorthat is closest to the output and a checkpoint, as well as all following instances ofProcessor. The instances ofProcessorin thcollections.OrderedDictare ordered in the same way as they were in the instance ofPipeline, i.e., from the checkpointProcessorto the outputProcessor.- Return type:
- from_checkpoint() Any[source]
Re-evaluates the
Pipelinefrom the last check-pointedProcessorusing the output from the checkpoint as input.- Raises:
RuntimeError – If the check-pointed
Processorclosest to output does not have anycheckpoint_datastored, i.e., theProcessorhas not been called since being declared a checkpoint.- Returns:
Returns the output of
Pipeline, starting from check-pointedProcessorclosest to output.- Return type:
- function(data: Any) Any[source]
Propagate data through the whole
Pipelinefrom front to back, calling all Processors in series.- Parameters:
data (Any) – The input data to the
Pipeline. This is the input data for the firstProcessorin thePipeline. The type of the input data depends on the firstProcessorin thePipeline.- Returns:
Returns the output of the
Pipeline, which is the output of the of all instances ofProcessorin thePipelinethat are flagged as outputs. If no instances ofProcessorare flagged as outputs, the output of the lastProcessoris returned.- Return type:
- __repr__() str[source]
Generates a
strrepresentation of thePipeline, which contains all instances ofProcessorin thePipelineand their output types.Example
>>> MyPipeline() MyPipeline( FunctionProcessor(processing_function=lambda x: x.mean(1),) -> numpy.ndarray SciPyPDist(metric=sqeuclidean) -> numpy.ndarray RadialBasisFunction(sigma=0.1) -> numpy.ndarray MyProcess(stuff=3, func=Param(FunctionType, lambda x: x**2)) -> numpy.ndarray )