C.pipeline¶
- class clibas.pipelines.Pipeline(*args)[source]¶
Bases:
HandlerOperation queue and execution manager for data processing workflows.
Provides infrastructure for building, executing, and monitoring data processing pipelines. Operations are enqueued and executed sequentially with automatic timing, logging, and progress tracking.
Example
>>> import clibas as C >>> C.initialize_from_config('config.yaml') >>> >>> #build pipeline >>> C.pipeline.enque([ ... C.fastq_parser.translate(), ... C.fastq_parser.len_filter(where='pep'), ... C.fastq_parser.save(where='pep', fmt='fasta') ... ]) >>> >>> #execute >>> loader = C.data_loader.fetch_gz_from_dir(data_dir='./sequencing_data/') >>> data = C.pipeline.load_and_run(loader=loader)
Methods¶
enque¶
- Pipeline.enque(ops)[source]
Add operations to the pipeline queue.
- Parameters:
ops (list) – List of callable operations. Each operation should accept a Data object and return a transformed Data object.
Example
>>> C.pipeline.enque([ ... parser.translate(force_at_frame=0), ... parser.len_filter(where='pep'), ... parser.save(where='pep', fmt='fasta') ... ])
run¶
- Pipeline.run(data=None, save_summary=True)[source]
Execute the enqueued pipeline.
Processes operations sequentially, transforming data through each step. Logs timing and data size information at each stage.
- Parameters:
data (Data, optional) – Input data. If None, first operation must load data (e.g., a loader function).
save_summary (bool) – If True, saves CSV summary of pipeline execution to logs directory. Default is True.
- Returns:
Transformed Data object after finishing all operations.
- Return type:
Data
Example
>>> data = C.pipeline.run() #execute full pipeline
load_and_run¶
- Pipeline.load_and_run(loader, save_summary=True)[source]
Execute pipeline with data loading as first step.
Convenience method that prepends a loader function to the queue and executes the full pipeline.
- Parameters:
loader (callable) – Function that returns a Data object.
save_summary (bool) – If True, saves execution summary.
- Returns:
Transformed Data object after finishing all operations.
- Return type:
Data
Example
>>> data = C.pipeline.load_and_run(C.data_loader.fetch_gz_from_dir())
stream¶
- Pipeline.stream(streamer, save_summary=True)[source]
Execute pipeline on streamed data for memory efficiency.
Processes samples one at a time from a generator, useful when total dataset exceeds available memory. Each sample processed independently through the full pipeline.
- Parameters:
streamer (generator) – Generator yielding Sample objects.
save_summary (bool) – If True, saves execution summary for each sample. Default is True.
Example
>>> #process large dataset sample-by-sample >>> C.pipeline.stream(C.data_loader.stream_from_gz_dir())