Getting started

CoRelAy is a tool to compose small-scale (single-machine) analysis pipelines. It was created to swiftly implement pipelines to generate analysis data which can then be visualized using ViRelAy.

Install

CoRelAy can be installed directly from PyPI:

$ pip install virelay

To install optional HDBSCAN and UMAP support, use

$ pip install corelay[umap,hdbscan]

For the current development version, or to try out examples, clone and install with:

$ git clone https://github.com/virelay/virelay.git
$ pip install ./virelay

Basic Usage

The main objective of ViRelAy is the quick and hassle-free composition of analysis Pipelines. Pipelines are designed with a number of steps, called Task, which each have their own default function, defined using a Processor. Any step of the pipeline may be individually changed by assigning a new Processor. Processors have Params which configure their functional hyperparameters.

Processor and Param

A Processor can be defined in the following way:

import numpy as np

from corelay.processor.base import Processor, Param
from types import FunctionType


class MyProcess(Processor):
    # Parameters are registered by defining a class attribute of type Param,
    # and will be set in __init__ automatically, which expects keyword
    # arguments with the same name the first value is a type specification,
    # the second a default value
    stuff = Param(dtype=int, default=2)
    # as class methods have to be bound explicitly, func here acts like a
    # static function of MyProcess. For more information see
    # corelay.processor.base.FunctionProcessor
    func = Param(FunctionType, lambda x: x**2)

    # Parameters can be accessed as self.<parameter-name>
    def function(self, data):
        return self.stuff * self.func(data) + 3

Every Processor is a subclass of Processor, and must implement function(), which typically only uses a single positional argument. Parameters for the Processor can be specified by assigning an instance of Param as a class attribute. The name of the attribute can be used as a keyword argument to specify the value when creating an instance of the Processor, and accessed under the same name. Each Param has a datatype dtype and a default value default. Processor instances can be used like functions, or assigned to a Task of a Pipeline.

Pipeline and Task

Pipelines consist of multiple, sequential, pre-determined steps, called Tasks, and can be defined in the following way:

from corelay.pipeline.base import Pipeline, Task
from corelay.processor.base import FunctionProcessor
from corelay.processor.affinity import Affinity, RadialBasisFunction
from corelay.processor.distance import Distance, SciPyPDist


class MyPipeline(Pipeline):
    # Task are registered in order by creating a class attribute of type
    # Task() and, like params, are expected to be supplied with the same name
    # in __init__ as a keyword argument. The first value is an optional
    # expected Process type, second is a default value, which has to be an
    # instance of that type. If the default argument is not a Process, it will
    # be converted to a FunctionProcessor by default, functions fed to
    # FunctionProcessors are by default not bound to the class. To bind them,
    # we can supply `bind_method=True` to the FunctionProcessor. Supplying it
    # to the task changes the default value of the Processor before creation:
    prepreprocess = Task(
        proc_type=FunctionProcessor,
        default=(lambda self, x: x * 2),
        bind_method=True
    )
    # Otherwise, we do not need to supply `self` for the default function:
    preprocess = Task(proc_type=FunctionProcessor, default=(lambda x: x**2))
    pdistance = Task(Distance, SciPyPDist(metric='sqeuclidean'))
    affinity = Task(Affinity, RadialBasisFunction(sigma=1.0))
    # empty task, does nothing (except return input) by default
    postprocess = Task()

Every Pipeline is a subclass of Pipeline. Tasks of a pipeline are created by assigning an instance of Task as a class attribute, similar to Params in Processors. Each Task has, each optional, a Processor-type proc_type, a default Processor for the Task default. Additional keyword arguments can be specified as default parameter values that should be assigned to any Processor that is used for the Task. The keyword argument bind_method is specific to FunctionProcessor, and describes, whether the function is static (default, bind_method=False), or whether it should have access to the Processor instance. Functions can be passed instead of Processors, which will be implicitly converted to a FunctionProcessor. Tasks can be assigned by passing Processors with their respective keyword argument during instantiation of the Pipeline, or by directly assigning them to the respective attribute.

Pipelines and Processors can be instantiated and used in the following way:

import numpy as np

from corelay.processor.base import FunctionProcessor
from corelay.processor.affinity import RadialBasisFunction
from types import FunctionType

# Use Pipeline 'as is'
pipeline = MyPipeline()
output1 = pipeline(np.random.rand(5, 3))
print('Pipeline output:', output1)

# Tasks are filled with Processes during initialization of the Pipeline
# class keyword arguments do not have to be in order, and if not supplied,
# the default value will be used
custom_pipeline = MyPipeline(
    # The pipeline's Task sets the `bind_method` Parameter's default to
    # True. Supplying a value here avoids falling back to the default
    # value, and thus we do not need a `self` argument for our function:
    prepreprocess=FunctionProcessor(
        function=(lambda x: x + 1), bind_method=False
    ),
    preprocess=(lambda x: x.mean(1)),
    postprocess = MyProcess(stuff=3)
)
custom_pipeline.affinity = RadialBasisFunction(sigma=.1),
output2 = custom_pipeline(np.ones((5, 3, 5)))
print('Custom pipeline output:', output2)

Like Processors, executing a Pipeline can be done by simply calling it like a function.

Examples

More examples to highlight some features of CoRelAy can be found in example/. The following demonstrates, how to create a functional pipeline based on corelay.pipeline.spectral.SpectralClustering. A similar version of the following code may be found in example/memoize_spectral_pipeline.py.

import time

import h5py
import numpy as np

from corelay.base import Param
from corelay.processor.base import Processor
from corelay.processor.flow import Sequential, Parallel
from corelay.pipeline.spectral import SpectralClustering
from corelay.processor.clustering import KMeans
from corelay.processor.embedding import TSNEEmbedding, EigenDecomposition
from corelay.io.storage import HashedHDF5


# custom processors can be implemented by defining a function attribute
class Flatten(Processor):
    def function(self, data):
        return data.reshape(data.shape[0], np.prod(data.shape[1:]))


class SumChannel(Processor):
    # parameters can be assigned by defining a class-owned Param instance
    axis = Param(int, 1)
    def function(self, data):
        return data.sum(1)


class Normalize(Processor):
    def function(self, data):
        data = data / data.sum((1, 2), keepdims=True)
        return data


np.random.seed(0xDEADBEEF)
fpath = 'test.analysis.h5'
with h5py.File(fpath, 'a') as fd:
    # HashedHDF5 is an io-object that stores outputs of Processors based on
    # hashes in hdf5
    iobj = HashedHDF5(fd.require_group('proc_data'))

    # generate some exemplary data
    data = np.random.normal(size=(64, 3, 32, 32))
    n_clusters = range(2, 20)

    # SpectralClustering is an Example for a pre-defined Pipeline
    pipeline = SpectralClustering(
        # processors, such as EigenDecomposition, can be assigned to
        # pre-defined tasks
        embedding=EigenDecomposition(n_eigval=8, io=iobj),
        # flow-based Processors, such as Parallel, can combine multiple
        # Processors broadcast=True copies the input as many times as there
        # are Processors broadcast=False instead attempts to match each
        # input to a Processor
        clustering=Parallel([
            Parallel([
                KMeans(n_clusters=k, io=iobj) for k in n_clusters
            ], broadcast=True),
            # io-objects will be used during computation when supplied to
            # Processors if a corresponding output value (here identified by
            # hashes) already exists, the value is not computed again but
            # instead loaded from the io object
            TSNEEmbedding(io=iobj)
        ], broadcast=True, is_output=True)
    )
    # Processors (and Params) can be updated by simply assigning
    # corresponding attributes
    pipeline.preprocessing = Sequential([
        SumChannel(),
        Normalize(),
        Flatten()
    ])

    start_time = time.perf_counter()

    # Processors flagged with "is_output=True" will be accumulated in the
    # output the output will be a tree of tuples, with the same hierarchy as
    # the pipeline (i.e. clusterings here contains a tuple of the k-means
    # outputs)
    clusterings, tsne = pipeline(data)

    # since we store our results in a hdf5 file, subsequent calls will not
    # compute the values (for the same inputs), but rather load them from the
    # hdf5 file try running the script multiple times
    duration = time.perf_counter() - start_time
    print(f'Pipeline execution time: {duration:.4f} seconds')