corelay.processor.base

A module that contains the abstract base class for all processors, Processor, as well as a basic processor, FunctionProcessor, which invokes a specified function. Furthermore, the module contains a function, which ensures that a specified argument is of type Processor and, if it is not, but callable, makes it a FunctionProcessor.

Functions

ensure_processor

Ensures that the specified processor or callable argument processor_or_callable is of type Processor and, if it is not, but callable, make it a FunctionProcessor.

Classes

FunctionProcessor

A Processor that executes a user-defined function.

Processor

The abstract base class of processors, which perform specific tasks in a corelay.pipeline.base.Pipeline instance.

class corelay.processor.base.Processor[source]

Bases: ABC, Plugboard

The abstract base class of processors, which perform specific tasks in a corelay.pipeline.base.Pipeline instance.

is_output: Annotated[bool, Param]

Contains a value indicating whether this Processor is the output of a Pipeline.

Parameters:
Return type:

Plug

is_checkpoint: Annotated[bool, Param]

Contains a value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value.

Parameters:
Return type:

Plug

io: Annotated[Storable, Param]

Contains an IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline.

Parameters:
Return type:

Plug

__init__(*args: Any, is_output: bool | None = None, is_checkpoint: bool | None = None, io: Storable | None = None, **kwargs: Any) None[source]

Initializes a new Processor instance. All defined Param class attributes are initialized either to their respective default values or, if supplied as keyword argument, to the value supplied.

Parameters:
  • *args (Any) – A list of the positional arguments, which will be used to initialize the parameters of the Processor that were marked as positional.

  • is_output (bool | None) – A value indicating whether this Processor is the output of a Pipeline. If None is specified, the corresponding Param will default to its defined default value, which is False.

  • is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. If None is specified, the corresponding Param will default to its defined default value, which is False.

  • io (Storable | None) – An IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline. If None is specified, the corresponding Param will default to its defined default value, which is an instance of corelay.io.NoStorage.

  • **kwargs (Any) – A dict of keyword arguments, which will be used to initialize the parameters of the Processor that were marked as keyword arguments. The keys of the dict are the names of the parameters, and the values are the values to be assigned to those parameters.

Raises:

TypeError – The number of positional arguments supplied is greater than the number of parameters that were marked as positional or a parameter was defined as both positional and a keyword argument.

Return type:

None

checkpoint_data: Any

If this Processor is a checkpoint, and if the processor was called at least once, stores the output of this processor.

abstractmethod function(data: Any) Any[source]

Applies a function to the input data. This function should be implemented by subclasses of Processor.

Parameters:

data (Any) – The input data to this Processor.

Raises:

NotImplementedError – This is an abstract method and should be implemented by subclasses of Processor and therefore always raises the NotImplementedError exception.

Returns:

Returns the output of the function applied to the input data.

Return type:

Any

__call__(data: Any) Any[source]

Applies function() on the input data and saves the output if the is_checkpoint Param was set to True.

Parameters:

data (Any) – The input data to this Processor.

Returns:

Returns the output of the function applied to the input data.

Return type:

Any

param_values() dict[str, Any][source]

Get values for all parameters defined through Param attributes.

Returns:

Returns a dict containing the names of the parameters as keys and their values as values.

Return type:

dict[str, Any]

identifiers() OrderedDict[str, Any][source]

Returns a dict containing the class qualifier name, as well all parameters marked as identifiers with their values.

Returns:

Returns an collections.OrderedDict, containing the class qualifier name and all parameters marked as identifiers with their values.

Return type:

collections.OrderedDict[str, Any]

copy() Processor[source]

Copies this processor, by creating a new Processor instance with the same values for the parameters defined as Param class attributes and the same checkpoint data.

Returns:

Returns a copy of this Processor instance.

Return type:

Processor

__repr__() str[source]

Generates a str representation of the Processor instance, including the class name, the parameters and their values, and the output representation, e.g., ProcessorName(metric=sqeuclidean, function=lambda x: x.mean(1)) -> numpy.ndarray.

Returns:

Returns a str representation of the Processor instance.

Return type:

str

__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

class corelay.processor.base.FunctionProcessor[source]

Bases: Processor

A Processor that executes a user-defined function.

Parameters:
  • processing_function (FunctionType) – The function around which to create the FunctionProcessor. This function will be invoked when the function() method is invoked or the FunctionProcessor object is called like a function. Depending on whether bind_method is True or False, it wil be bound as a method to the FunctionProcessor object.

  • is_output (bool) – A value indicating whether this FunctionProcessor is the output of a Pipeline. Defaults to False.

  • is_checkpoint (bool | None) – A value indicating whether check-pointed pipeline computations should start at this point, if there exists a previously computed checkpoint value. Defaults to False.

  • io (Storable | None) – An IO object that is used to cache intermediate results of the Pipeline, which can then be re-used in this run or in subsequent runs of the Pipeline. Defaults to an instance of NoStorage.

  • bind_method (bool) – A value indicating whether the processing_function will be bound to this class, enabling it to access self. Defaults to False.

__tracked__: collections.OrderedDict[str, Any]

An collections.OrderedDict with all public class attributes, i.e., all class attributes not enclosed with double underscores.

processing_function: Annotated[LambdaType, Param]

The function around which to create the FunctionProcessor. This function will be invoked when the function() method is invoked or the FunctionProcessor object is called like a function. Depending on whether bind_method is True or False, it wil be bound as a method to the FunctionProcessor object.

Parameters:
Return type:

Plug

bind_method: Annotated[bool, Param]

A value indicating whether the processing_function will be bound to this class, enabling it to access self.

Parameters:
Return type:

Plug

function(data: Any) Any[source]

Invokes the function bound to this class with the input data.

Note

In a previous version of CoRelAy, the processing_function was actually bound to the class in the __call__() method, but this caused typing issues, as static type checkers like MyPy believed that the FunctionProcessor class was still abstract, as it did not explicitly override the function() method. The processing_function used to be called just function(), which meant, that during runtime, functionally, the function() method was overridden, as its slot would have been taken by the function() parameter. Statically, however, this was not the case. Overriding the function() method and still binding the processing_function to the class in the __call__() method causes more typing issues, as the static type checker does not allow us to write to a method slot. For this reason, the function() method was overridden and internally calls the processing_function method with self as the first argument. Functionally, this should be equivalent to the previous version, but it is not guaranteed that it is in every use case. This might have rethought and changed in the future.

Parameters:

data (Any) – The input data to this Processor.

Returns:

Returns the output of the function applied to the input data.

Return type:

Any

corelay.processor.base.ensure_processor(processor_or_callable: Processor | Callable[[...], Any], **kwargs: Any) Processor[source]

Ensures that the specified processor or callable argument processor_or_callable is of type Processor and, if it is not, but callable, make it a FunctionProcessor. Sets the attributes of resulting processor as stated in **kwargs.

Parameters:
  • processor_or_callable (Processor | Callable[..., Any]) – The processor or callable for which to ensure that it is a Processor.

  • **kwargs (Any) – The keyword arguments to be passed to the Processor. These keyword arguments are used to set the values of the parameters of the Processor.

Raises:

TypeError – The supplied processor or callable processor_or_callable is neither a Processor nor callable.

Returns:

Returns the original processor_or_callable if it was a Processor, or a new FunctionProcessor, which calls it if it was a callable. The attributes of the resulting processor are set as stated in **kwargs.

Return type:

Processor