"""
Data preprocessing utilities for machine learning workflows.
Provides DataPreprocessor onject for preparing sequencing data for machine learning,
including filtering, featurization, train/test splitting, and data augmentation.
"""
import copy
import inspect
import os
from collections import defaultdict
import numpy as np
import clibas.featurization as featurize
from clibas.baseclasses import Handler
from clibas.datatypes import AnalysisSample, Data
[docs]
class DataPreprocessor(Handler):
"""
Preprocessing tools for machine learning dataset preparation.
Provides methods for filtering, featurization, sampling, and data augmentation
of sequence datasets. Operations transform Data objects for downstream machine
learning workflows. Typically accessed through the clibas facade after initialization.
Example:
>>> import clibas as C
>>> C.initialize_from_config('config.yaml')
>>> # Preprocessor is now ready to use
>>> # as C.preprocessor
Note:
This class is not typically instantiated directly. Use the clibas
initialization system to access preprocessing functionality.
"""
def __init__(self, *args):
super(DataPreprocessor, self).__init__(*args)
return
def __repr__(self):
return "<DataPreprocessor object>"
[docs]
def token_filter(self, tokens_to_filter_by=None):
"""
Filter sequences containing specific tokens.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Removes all sequences (X array entries) that contain any of the specified
tokens. Useful for filtering out sequences with ambiguous or unwanted
characters.
Args:
tokens_to_filter_by (list, tuple, or ndarray): Single-letter encoded
tokens to filter out (e.g., amino acids, bases).
Returns:
callable: Operation that accepts a Data object, filters sequences containing specified tokens, and returns the modified Data object.
Example:
>>> #remove sequences containing 'X' or 'Z' amino acids
>>> token_filt = C.preprocessor.token_filter(tokens_to_filter_by=['X', 'Z'])
>>> data = token_filt(data)
"""
if tokens_to_filter_by is None:
msg = "<filter_by_token> op expected aas_to_filter_by argument."
self.logger.error(msg)
raise ValueError(msg)
if not isinstance(tokens_to_filter_by, (tuple, list, np.ndarray)):
msg = "<filter_by_token> op expected param tokens_to_filter_by as type=(list, tuple, np.ndarray); received: {type(tokens_to_filter_by)}"
self.logger.error(msg)
raise ValueError(msg)
def filter_by_token(data):
for sample in data:
arr = self._cast(sample.X, "2d")
self._empty_array_check(arr, inspect.stack()[0][3])
t = np.asarray(tokens_to_filter_by).astype(arr.dtype)
ind = ~np.isin(arr, t).reshape(arr.shape)
ind = np.sum(ind, axis=1) == arr.shape[1]
sample.ind_filter(ind)
return data
return filter_by_token
[docs]
def intrasample_unique(self):
"""
Remove duplicate sequences within each sample.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Removes duplicate entries within each sample's X dataset. Equivalent to
calling np.unique(X, axis=0) on each sample. Entries are resorted during
the process.
Returns:
callable: Operation that accepts a Data object, removes intra-sample duplicates, and returns the modified Data object.
Example:
>>> unique_op = C.preprocessor.intrasample_unique()
>>> data = unique_op(data)
"""
def get_intrasample_unique(data):
for sample in data:
self._empty_array_check(sample.X, inspect.stack()[0][3])
ind = np.unique(sample.X, axis=0, return_index=True)[1]
sample.ind_filter(ind)
return data
return get_intrasample_unique
[docs]
def intersample_unique(self):
"""
Remove sequences found in multiple samples.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Compares X arrays across all samples and removes entries that appear in
more than one sample. Only sequences unique to a single sample are retained.
Returns:
callable: Operation that accepts a Data object, removes inter-sample duplicates, and returns the modified Data object.
Note:
If X arrays have different widths, they will be padded to the maximum
width before comparison.
Example:
>>> intersample_op = C.preprocessor.intersample_unique()
>>> data = intersample_op(data)
"""
def _pad_to_new_dim(arr, new_dim):
to_pad = new_dim - arr.shape[1]
if to_pad == 0:
return arr
new_arr = np.zeros((arr.shape[0], new_dim), dtype=arr.dtype)
new_arr[:, : arr.shape[1]] = arr
return new_arr
def _repad_X(data):
# unify the size of dimension -1 for all X arrays in the dataset
W = [self._cast(sample.X, "2d").shape[-1] for sample in data]
if len(set(W)) != 1:
msg = "<intersample_unique>: the dataset contains X arrays of different width; they will be padded to unify. . ."
self.logger.warning(msg)
new_W = max(W)
for sample in data:
sample.X = _pad_to_new_dim(self._cast(sample.X, "2d"), new_W)
return data
def to_void(arr):
return arr.view(np.dtype((np.void, arr.dtype.itemsize * arr.shape[1])))
def get_intersample_unique(data):
# this ensures that X are cast to 2d
data = _repad_X(data)
V = []
idx = []
# turn to void type for quick comparison
for i, sample in enumerate(data):
self._empty_array_check(sample.X, inspect.stack()[0][3])
V.append(to_void(sample.X))
idx.append(np.full(sample.size, i, dtype=np.int8))
V = np.concatenate(V)
idx = np.concatenate(idx)
_, inverse, counts = np.unique(V, return_inverse=True, return_counts=True)
sample_tracker_dict = defaultdict(set)
for i, row_idx in enumerate(inverse):
sample_tracker_dict[row_idx.item()].add(idx[i])
mask = np.array(
[len(sample_tracker_dict[row.item()]) == 1 for row in inverse]
)
offset = 0
for sample in data:
n = sample.size
sample.X = sample.X[mask[offset : offset + n]]
offset += n
return data
return get_intersample_unique
[docs]
def filter_external(self, external=None, max_hd=None):
"""
Filter sequences similar to an external dataset.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Removes sequences that are within a specified Hamming distance of any
sequence in an external dataset. Useful for removing validation/test
sequences or known contaminants from training data.
Args:
external: External dataset to compare against. Should be castable
to np.ndarray or compatible with AnalysisSample.
max_hd (int): Maximum Hamming distance threshold. Sequences with
distance ≤ max_hd from any external sequence are removed.
Returns:
callable: Operation that accepts a Data object, filters sequences
similar to external dataset, and returns the modified Data object.
Example:
>>> #remove sequences similar to validation set
>>> external_filt = C.preprocessor.filter_external(
... external=validation_sequences, max_hd=2
... )
>>> data = external_filt(data)
"""
from clibas.misc import hamming_distance
if not isinstance(max_hd, int):
msg = "<filter_external> op expected min_hd as type=int; received: {type(min_hd)}"
self.logger.error(msg)
raise ValueError(msg)
# cast w/e is passed as an external dataset to AnalysisSample
external = AnalysisSample(X=external).X
def filter_external_X(data):
for sample in data:
arr = self._cast(sample.X, "2d")
self._empty_array_check(sample.X, inspect.stack()[0][3])
dtype = arr.dtype
ext = self._cast(external, "2d").astype(dtype)
if ext.shape[-1] != arr.shape[-1]:
msg = f"<filter_external>: the shape of external dataset along dimension 2 ({ext.shape[-1]}) does not match the data ({arr.shape[-1]})!"
self.logger.error(msg)
raise ValueError(msg)
for pep in ext:
to_pop = hamming_distance(
arr, pep, max_hd, cum=True, return_index=True
)
ind = np.ones(sample.size, dtype=bool)
ind[to_pop] = False
sample.ind_filter(ind)
return data
return filter_external_X
[docs]
def merge(self):
"""
Merge all samples into a single dataset.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Combines all samples in the Data object into a single merged sample.
The merged sample is named 'merged_data'.
Returns:
callable: Operation that accepts a Data object, merges all samples, and returns the modified Data object containing a single sample.
Example:
>>> merge_op = C.preprocessor.merge()
>>> data = merge_op(data)
>>> # data now contains a single merged sample
"""
def merge_datasets(data):
data.stack()
data.samples[0].name = "merged_data"
return data
return merge_datasets
[docs]
def sample(self, sample_size=None):
"""
Randomly sample from each dataset.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Randomly samples a specified number or fraction of sequences from each
sample in the dataset. Applied independently to each sample.
Args:
sample_size (int or float): Number or fraction of sequences to sample.
If ≤ 1, interpreted as fraction of dataset to keep.
If > 1, interpreted as absolute number of sequences to sample.
Returns:
callable: Operation that accepts a Data object, samples from each dataset, and returns the modified Data object.
Example:
>>> #sample 50% of each dataset
>>> sample_op = C.preprocessor.sample(sample_size=0.5)
>>> data = sample_op(data)
>>>
>>> #sample exactly 1000 sequences from each dataset
>>> sample_op = C.preprocessor.sample(sample_size=1000)
>>> data = sample_op(data)
"""
if not (isinstance(sample_size, int) or isinstance(sample_size, float)):
msg = "<sample_from_datasets> op expected sample_size as type=int or float; received: {type(sample_size)}"
self.logger.error(msg)
raise ValueError(msg)
def take_a_sample(data):
for sample in data:
if sample_size <= 1:
size = int(sample_size * sample.size)
else:
size = int(sample_size)
if sample.size < size:
msg = f"Cannot take a sample that is bigger than the dataset. Sampling is ignored for {sample} sample."
self.logger.warning(msg)
continue
ind = np.random.choice(sample.size, size=size, replace=False)
sample.ind_filter(ind)
return data
return take_a_sample
[docs]
def shuffle(self):
"""
Randomly shuffle sequences within each sample.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Randomly reorders sequences within each sample's X dataset. Applied
independently to each sample.
Returns:
callable: Operation that accepts a Data object, shuffles sequences within each sample, and returns the modified Data object.
Example:
>>> shuffle_op = C.preprocessor.shuffle()
>>> data = shuffle_op(data)
"""
def shuffle_intraset(data):
for sample in data:
ind = np.arange(sample.X.shape[0])
np.random.shuffle(ind)
sample.ind_filter(ind)
return data
return shuffle_intraset
# TODO: not very happy with having to deepcopy the sample in memory,
# even if briefly; but it isn't a major issue at the moment.
[docs]
def tt_split(self, test_fraction=None):
"""
Perform train/test split on dataset.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Splits a single sample into separate train and test datasets. The input
Data object must contain exactly one sample.
Args:
test_fraction (float): Fraction of data to allocate to test set
(between 0 and 1). Remaining data goes to training set.
Returns:
callable: Operation that accepts a Data object with one sample, splits it, and returns a Data object with two samples named 'train_data' and 'test_data'.
Raises:
ValueError: If input Data contains more than one sample.
Example:
>>> #split into 80% train, 20% test
>>> split_op = C.preprocessor.tt_split(test_fraction=0.2)
>>> data = split_op(data)
>>> #data now contains train_data and test_data samples
"""
if not (isinstance(test_fraction, int) or isinstance(test_fraction, float)):
msg = "<test_train_split> op expected test_fraction as type=int or float; received: {type(test_fraction)}"
self.logger.error(msg)
raise ValueError(msg)
def test_train_split(data):
if data.size != 1:
msg = "A single sample must be passed to the <test_train_split> op; received: {data.size} samples. . ."
self.logger.error(msg)
raise ValueError(msg)
sample = data.samples[0]
full_set_size = sample.size
test_set_size = int(test_fraction * full_set_size)
test_set_ind = np.random.choice(
full_set_size, size=test_set_size, replace=False
)
mask = np.ones(full_set_size, dtype=bool)
mask[test_set_ind] = False
train_sample = sample
test_sample = copy.deepcopy(sample)
train_sample.name = "train_data"
train_sample.ind_filter(mask)
test_sample.name = "test_data"
test_sample.ind_filter(~mask)
data = Data(samples=[train_sample, test_sample])
return data
return test_train_split
[docs]
def to_h5(
self, F=None, alphabet=None, reshape=False, chunks=None, return_data=False
):
"""
Featurize sequences and save to HDF5 files.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Converts sequence datasets to numerical feature representations and saves
them as HDF5 files. Useful when featurized datasets are too large to fit
in memory. Processes data in chunks for memory efficiency.
Args:
F (str, ndarray, or None): Feature matrix specification. If ``None``,
uses one-hot encoding. String options include 'varimax',
'pep_ECFP3', 'pep_ECFP4', 'pep_SMILES'. Can provide custom
2D array with ``F.shape[0] == len(alphabet)``.
alphabet (tuple, list, or ndarray, optional): Token alphabet. If ``None``,
uses peptide alphabet from configuration.
reshape (bool): If True, represents sequences as multidimensional
tensors. If False, unrolls to vectors. Default is False.
chunks (int): Process data in chunks of this size for memory efficiency.
return_data (bool): If True, returns unmodified Data object. If False,
returns None. Default is False.
Returns:
callable: Operation that accepts a Data object, featurizes sequences to HDF5 files, and returns Data object or ``None`` based on return_data.
Note:
HDF5 files are saved in the ml_data directory specified in the config file.
Example:
>>> to_h5_op = C.preprocessor.to_h5(F='pep_ECFP4', chunks=20, reshape=True)
>>> to_h5_op(data)
"""
if not isinstance(reshape, bool):
msg = "<featurize_to_h5> op expected param reshape as type=bool; received: {type(reshape)}"
self.logger.error(msg)
raise ValueError(msg)
if not isinstance(chunks, int):
msg = "<featurize_to_h5> op expected param chunks as type=int; received: {type(chunks)}"
self.logger.error(msg)
raise ValueError(msg)
if not isinstance(return_data, bool):
msg = "<featurize_to_h5> op expected param return_data as type=bool; received: {type(return_data)}"
self.logger.error(msg)
raise ValueError(msg)
alphabet = self._infer_alphabet(alphabet=alphabet)
# no need to pre-validate F -> FeatureMatrix will take care of it
from clibas.featurization import FeatureMatrix
F = FeatureMatrix.make(descr=F, constants=self.constants).F
def featurize_to_h5(data):
for sample in data:
arr = self._joint_alphabet_X_check(sample.X, alphabet)
self._empty_array_check(arr, inspect.stack()[0][3])
self._prepare_destinations(root=self.dirs.ml_data)
path = os.path.join(self.dirs.ml_data, f"{sample.name}.hdf5")
featurize.into_h5(
arr,
y=sample.y,
alphabet=alphabet,
path=path,
F=F,
reshape=reshape,
chunks=chunks,
)
if return_data:
return data
return
return featurize_to_h5
[docs]
def featurize_X(self, F=None, alphabet=None, reshape=False):
"""
Featurize sequence datasets in memory.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Converts sequence datasets to numerical feature representations. Use when
featurized datasets fit in memory. For large datasets, use to_h5() instead.
Args:
F (str, ndarray, or None): Feature matrix specification. If ``None``,
uses one-hot encoding. String options include 'varimax',
'pep_ECFP3', 'pep_ECFP4', 'pep_SMILES'. Can provide custom
2D array with ``F.shape[0] == len(alphabet)``.
alphabet (tuple, list, or ndarray, optional): Token alphabet. If ``None``,
uses peptide alphabet from the config file.
reshape (bool): If True, represents sequences as multidimensional
tensors. If False, unrolls to vectors. Default is False.
Returns:
callable: Operation that accepts a Data object, featurizes X datasets, and returns the modified Data object.
Example:
>>> featurize_op = C.preprocessor.featurize_X(F='pep_ECFP4', reshape=True)
>>> data = featurize_op(data)
"""
if not isinstance(reshape, bool):
msg = "<featurize_X_datasets> op expected param reshape as type=bool; received: {type(reshape)}"
self.logger.error(msg)
raise ValueError(msg)
alphabet = self._infer_alphabet(alphabet=alphabet)
# no need to pre-validate F -> FeatureMatrix will take care of it
from clibas.featurization import FeatureMatrix
F = FeatureMatrix.make(descr=F, constants=self.constants).F
def featurize_X_datasets(data):
for sample in data:
arr = self._joint_alphabet_X_check(sample.X, alphabet)
self._empty_array_check(arr, inspect.stack()[0][3])
sample.X = featurize.from_matrix_v3(
arr, F=F, alphabet=alphabet, reshape=reshape
)
return data
return featurize_X_datasets
[docs]
def featurize_for_RFA(self, alphabet=None, order=None):
"""
Featurize sequences for Reference-Free Analysis (RFA) models as implemented
in DOMEK workflows. See https://www.cell.com/chem/abstract/S2451-9294(25)00328-6
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Converts peptide sequences to feature representations compatible with
RFA models. See https://www.nature.com/articles/s41467-024-51895-5
for methodology details.
Args:
alphabet (list, tuple, or ndarray): Amino acid alphabet comprising
the sequences. Alphabet size determines feature dimensions.
order (str): RFA model order. Must be 'first' or 'second'. Higher
orders not yet implemented.
Returns:
callable: Operation that accepts a Data object, featurizes sequences for RFA, and returns the modified Data object with flattened 2D feature arrays.
Example:
>>> rfa_op = C.preprocessor.featurize_for_RFA(alphabet='aa', order='second')
>>> data = rfa_op(data)
"""
if order != "first" and order != "second":
msg = '<featurize_for_RFA>: the "order" keyword not understood: only "first" or "second" are allowed values!'
self.logger.error(msg)
raise ValueError(msg)
alphabet = self._infer_alphabet(alphabet=alphabet)
def RFA_featurization(data):
for sample in data:
arr = self._joint_alphabet_X_check(sample.X, alphabet)
self._empty_array_check(arr, inspect.stack()[0][3])
sample.X = featurize.RFA_featurization(
arr, alphabet=alphabet, order=order
)
return data
return RFA_featurization
[docs]
def drop(self, sample_to_drop=None):
"""
Remove a sample from the dataset by name.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Removes the specified sample from the Data object. If sample name is not
found, logs a warning and returns data unchanged.
Args:
sample_to_drop (str): Name of the sample to remove.
Returns:
callable: Operation that accepts a Data object, removes the specified sample, and returns the modified Data object.
Example:
>>> drop_op = C.preprocessor.drop(sample_to_drop='validation_data')
>>> data = drop_op(data)
"""
if not isinstance(sample_to_drop, str):
msg = "<drop_dataset> op expected param dataset_to_drop as type=str; received: {type(dataset_to_drop)}"
self.logger.error(msg)
raise ValueError(msg)
def drop_dataset(data):
to_drop = []
for i, sample in enumerate(data):
if sample.name == sample_to_drop:
to_drop.append(i)
if not to_drop:
msg = f"<drop_dataset>: no {sample_to_drop} in the dataset. . ."
self.logger.warning(msg)
for i in to_drop:
del data.samples[i]
return data
return drop_dataset
[docs]
def pad_and_random_shift(self, new_x_dim=None):
"""
Expand and randomly shift sequences for data augmentation.
.. note::
**Pipeline Operation** - Returns a callable for use in processing pipelines.
Pads each sequence to a fixed width and applies a random circular shift.
The shift is constrained so that non-pad values remain within bounds,
effectively redistributing padding without truncating sequence data.
Useful for data augmentation and positional invariance.
Args:
new_x_dim (int): Target width for padded sequences. Must be ≥ current
sequence width.
Returns:
callable: Operation that accepts a Data object, pads and shifts sequences, and returns the modified Data object.
Example:
If new_x_dim=6, a sequence array::
[['A', 'B', 'C', 'D'],
['E', 'F', 'G', 'H'],
['I', 'J', 'K', 'L'],
['M', 'N', 'O', 'P']]
might become:
[['', '', 'A', 'B', 'C', 'D'],
['', '', 'E', 'F', 'G', 'H'],
['I', 'J', 'K', 'L', '', ''],
['', 'M', 'N', 'O', 'P', '']]
with padding randomly distributed on either side.
>>> augment_op = C.preprocessor.pad_and_random_shift(new_x_dim=20)
>>> data = augment_op(data)
"""
if not isinstance(new_x_dim, int):
msg = f'<x_expand_and_shift> op expended "new_x_dim" param as dtype=int; received: {type(new_x_dim)}'
self.logger.error(msg)
raise ValueError(msg)
def expand_and_random_shift(data):
for sample in data:
arr = self._cast(sample.X, "2d")
self._empty_array_check(arr, inspect.stack()[0][3])
dtype = arr.dtype
pad = dtype.type()
expanded_arr = np.zeros((arr.shape[0], new_x_dim), dtype=dtype)
expanded_arr[:, : arr.shape[1]] = arr
L = np.sum(arr != pad, axis=1)
max_shift = new_x_dim - L + 1
# this isn't easy to understand, but it works. essentially,
# we are doing a circular permutation by a random amount but
# not larger than the number of pads on the right side of a given
# row. what this does is reshuffle some of the pads to the left
shift_ind = np.random.randint(0, high=max_shift, size=arr.shape[0])
idx = np.mod(np.arange(new_x_dim) - shift_ind[:, None], new_x_dim)
sample.X = expanded_arr[np.arange(expanded_arr.shape[0])[:, None], idx]
return data
return expand_and_random_shift