adc_toolkit.processing.steps

Prebuilt transformation functions for data processing pipelines.

This module provides a library of ready-to-use transformation functions designed to work seamlessly with ~adc_toolkit.processing.ProcessingPipeline. These functions handle common data processing tasks such as cleaning, filtering, transforming, and combining data.

Available Submodules

pandas Transformation functions for pandas DataFrames. Includes functions for removing duplicates, filling missing values, scaling data, encoding categorical variables, and more.

All step functions follow the standard contract: they accept a Data object (e.g., pandas DataFrame) as the first positional argument and return a Data object. Additional parameters are passed as keyword arguments.

Examples

Using prebuilt steps with a pipeline:

>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import (
...     remove_duplicates,
...     fill_missing_values,
...     scale_data,
... )
>>>
>>> pipeline = (
...     ProcessingPipeline()
...     .add(remove_duplicates, subset=["id"])
...     .add(fill_missing_values, method="mean", columns=["value"])
...     .add(scale_data, columns=["value"], method="minmax")
... )
>>> result = pipeline.run(raw_data)

Importing functions directly:

>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
>>> clean_df = remove_duplicates(df, subset=["customer_id"])

Using the convenience re-exports:

>>> from adc_toolkit.processing import steps
>>> clean_df = steps.remove_duplicates(df, subset=["id"])
See Also

adc_toolkit.processing.ProcessingPipeline: Pipeline for chaining steps.
adc_toolkit.processing.steps.pandas: Pandas-specific step functions.

Notes

The steps module is designed for extensibility. Future versions may include additional submodules for other data types (e.g., PySpark DataFrames, Polars).

Each step function is a standalone, pure function that can be used independently of the pipeline framework. This makes them easy to test, compose, and reuse.

 1"""
 2Prebuilt transformation functions for data processing pipelines.
 3
 4This module provides a library of ready-to-use transformation functions
 5designed to work seamlessly with :class:`~adc_toolkit.processing.ProcessingPipeline`.
 6These functions handle common data processing tasks such as cleaning, filtering,
 7transforming, and combining data.
 8
 9Available Submodules
10--------------------
11pandas
12    Transformation functions for pandas DataFrames. Includes functions for
13    removing duplicates, filling missing values, scaling data, encoding
14    categorical variables, and more.
15
16All step functions follow the standard contract: they accept a ``Data`` object
17(e.g., pandas DataFrame) as the first positional argument and return a ``Data``
18object. Additional parameters are passed as keyword arguments.
19
20Examples
21--------
22**Using prebuilt steps with a pipeline:**
23
24>>> from adc_toolkit.processing import ProcessingPipeline
25>>> from adc_toolkit.processing.steps.pandas import (
26...     remove_duplicates,
27...     fill_missing_values,
28...     scale_data,
29... )
30>>>
31>>> pipeline = (
32...     ProcessingPipeline()
33...     .add(remove_duplicates, subset=["id"])
34...     .add(fill_missing_values, method="mean", columns=["value"])
35...     .add(scale_data, columns=["value"], method="minmax")
36... )
37>>> result = pipeline.run(raw_data)
38
39**Importing functions directly:**
40
41>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
42>>> clean_df = remove_duplicates(df, subset=["customer_id"])
43
44**Using the convenience re-exports:**
45
46>>> from adc_toolkit.processing import steps
47>>> clean_df = steps.remove_duplicates(df, subset=["id"])
48
49See Also
50--------
51adc_toolkit.processing.ProcessingPipeline : Pipeline for chaining steps.
52adc_toolkit.processing.steps.pandas : Pandas-specific step functions.
53
54Notes
55-----
56The steps module is designed for extensibility. Future versions may include
57additional submodules for other data types (e.g., PySpark DataFrames, Polars).
58
59Each step function is a standalone, pure function that can be used independently
60of the pipeline framework. This makes them easy to test, compose, and reuse.
61"""
62
63# Re-export commonly used functions for convenience
64from .pandas import (
65    divide_one_column_by_another,
66    encode_categorical,
67    fill_missing_values,
68    filter_rows,
69    group_and_aggregate,
70    make_columns_snake_case,
71    remove_duplicates,
72    scale_data,
73    select_columns,
74    validate_is_dataframe,
75)
76
77
78__all__ = [
79    "divide_one_column_by_another",
80    "encode_categorical",
81    "fill_missing_values",
82    "filter_rows",
83    "group_and_aggregate",
84    "make_columns_snake_case",
85    "pandas",
86    "remove_duplicates",
87    "scale_data",
88    "select_columns",
89    "validate_is_dataframe",
90]
def divide_one_column_by_another( data: pandas.core.frame.DataFrame, numerator: str, denominator: str, new_column_name: str) -> pandas.core.frame.DataFrame:
107def divide_one_column_by_another(
108    data: pd.DataFrame, numerator: str, denominator: str, new_column_name: str
109) -> pd.DataFrame:
110    """
111
112    Parameters
113
114    ----------
115    data : pd.DataFrame
116        The input DataFrame containing the data to be transformed.
117    numerator : str
118        The name of the column to be used as the numerator.
119    denominator : str
120        The name of the column to be used as the denominator.
121    new_column_name : str
122        The name of the new column to be created.
123    Returns
124    -------
125    pd.DataFrame
126        The transformed DataFrame with the new column added.
127    """
128    data[new_column_name] = data[numerator] / data[denominator]
129    return data

Parameters


data : pd.DataFrame The input DataFrame containing the data to be transformed. numerator : str The name of the column to be used as the numerator. denominator : str The name of the column to be used as the denominator. new_column_name : str The name of the new column to be created.

Returns
  • pd.DataFrame: The transformed DataFrame with the new column added.
def encode_categorical( data: pandas.core.frame.DataFrame, columns: list[str], method: str = 'onehot') -> pandas.core.frame.DataFrame:
 72def encode_categorical(data: pd.DataFrame, columns: list[str], method: str = "onehot") -> pd.DataFrame:
 73    """
 74    Encode categorical features using the specified encoding method.
 75
 76    Parameters
 77    ----------
 78    data : pd.DataFrame
 79        The input DataFrame.
 80    columns : List[str]
 81        Columns to encode.
 82    method : str
 83        Encoding method ('onehot' or 'label').
 84
 85    Returns
 86    -------
 87    pd.DataFrame
 88        DataFrame with encoded columns.
 89
 90    Raises
 91    ------
 92    ImportError
 93        If scikit-learn is not installed and method is 'label'.
 94    """
 95    if method == "onehot":
 96        return pd.get_dummies(data, columns=columns)
 97    elif method == "label":
 98        _check_sklearn_available()
 99        encoder = LabelEncoder()
100        for col in columns:
101            data[col] = encoder.fit_transform(data[col])
102        return data
103    else:
104        raise ValueError(f"Invalid encoding method: {method}")

Encode categorical features using the specified encoding method.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • columns (List[str]): Columns to encode.
  • method (str): Encoding method ('onehot' or 'label').
Returns
  • pd.DataFrame: DataFrame with encoded columns.
Raises
  • ImportError: If scikit-learn is not installed and method is 'label'.
def fill_missing_values( data: pandas.core.frame.DataFrame, method: str = 'mean', value: Any = None, columns: list[str] | None = None) -> pandas.core.frame.DataFrame:
44def fill_missing_values(
45    data: pd.DataFrame,
46    method: str = FillMethod.MEAN.value,
47    value: Any = None,
48    columns: list[str] | None = None,
49) -> pd.DataFrame:
50    """
51    Fill or interpolate missing values in the DataFrame.
52
53    Parameters
54    ----------
55    data : pd.DataFrame
56        The input DataFrame.
57    method : str
58        The method to fill missing values ("mean", "median", "mode", "constant" or "interpolate").
59    value : Any
60        Specific value to use for filling if `method="constant"`.
61    columns : Optional[List[str]]
62        List of columns to apply the filling method to. If None, applies to all columns.
63
64    Returns
65    -------
66    pd.DataFrame
67        DataFrame with missing values filled.
68    """
69    try:
70        fill_method = FillMethod(method)
71    except ValueError as e:
72        raise ValueError(f"Invalid method: {method}") from e
73
74    if columns is None:
75        columns = list(data.columns)
76
77    missing_columns = [col for col in columns if col not in data.columns]
78    if missing_columns:
79        raise ValueError(
80            f"The following columns are not in the DataFrame: {missing_columns}. Available columns: {data.columns}"
81        )
82
83    data = data.copy()
84    if fill_method == FillMethod.MEAN:
85        data[columns] = data[columns].fillna(data[columns].mean())
86    elif fill_method == FillMethod.MEDIAN:
87        data[columns] = data[columns].fillna(data[columns].median())
88    elif fill_method == FillMethod.MODE:
89        data[columns] = data[columns].apply(lambda col: col.fillna(col.mode().iloc[0]))
90    elif fill_method == FillMethod.CONSTANT and value is not None:
91        data[columns] = data[columns].fillna(value)
92    elif fill_method == FillMethod.INTERPOLATE:
93        data[columns] = data[columns].interpolate()
94
95    return data

Fill or interpolate missing values in the DataFrame.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • method (str): The method to fill missing values ("mean", "median", "mode", "constant" or "interpolate").
  • value (Any): Specific value to use for filling if method="constant".
  • columns (Optional[List[str]]): List of columns to apply the filling method to. If None, applies to all columns.
Returns
  • pd.DataFrame: DataFrame with missing values filled.
def filter_rows( data: pandas.core.frame.DataFrame, condition: Callable[[pandas.core.frame.DataFrame], pandas.core.series.Series]) -> pandas.core.frame.DataFrame:
 9def filter_rows(data: pd.DataFrame, condition: Callable[[pd.DataFrame], pd.Series]) -> pd.DataFrame:
10    """
11    Filter rows based on a condition.
12
13    Parameters
14    ----------
15    data : pd.DataFrame
16        The input DataFrame.
17    condition : Callable[[pd.DataFrame], pd.Series]
18        A function that returns a boolean Series indicating rows to keep.
19
20    Returns
21    -------
22    pd.DataFrame
23        Filtered DataFrame.
24
25    Example
26    --------
27    >>> data = pd.DataFrame({"A": [1, 2, 3, 4], "B": ["a", "b", "c", "d"]})
28    >>> condition = lambda df: df["A"] > 2
29    >>> filter_rows(data, condition)
30       A  B
31    2  3  c
32    3  4  d
33    """
34    return data[condition(data)]

Filter rows based on a condition.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • condition (Callable[[pd.DataFrame], pd.Series]): A function that returns a boolean Series indicating rows to keep.
Returns
  • pd.DataFrame: Filtered DataFrame.
Example
>>> data = pd.DataFrame({"A": [1, 2, 3, 4], "B": ["a", "b", "c", "d"]})
>>> condition = lambda df: df["A"] > 2
>>> filter_rows(data, condition)
   A  B
2  3  c
3  4  d
def group_and_aggregate( data: pandas.core.frame.DataFrame, group_by_columns: list[str], agg_funcs: dict[str, Callable]) -> pandas.core.frame.DataFrame:
 9def group_and_aggregate(
10    data: pd.DataFrame, group_by_columns: list[str], agg_funcs: dict[str, Callable]
11) -> pd.DataFrame:
12    """
13    Group data by specified columns and apply aggregation functions.
14
15    Parameters
16    ----------
17    data : pd.DataFrame
18        The input DataFrame.
19    group_by_columns : List[str]
20        Columns to group by.
21    agg_funcs : Dict[str, Callable]
22        Dictionary mapping column names to aggregation functions.
23
24    Returns
25    -------
26    pd.DataFrame
27        Aggregated DataFrame.
28    """
29    return data.groupby(group_by_columns).agg(agg_funcs).reset_index()

Group data by specified columns and apply aggregation functions.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • group_by_columns (List[str]): Columns to group by.
  • agg_funcs (Dict[str, Callable]): Dictionary mapping column names to aggregation functions.
Returns
  • pd.DataFrame: Aggregated DataFrame.
def make_columns_snake_case(data: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
102def make_columns_snake_case(data: pd.DataFrame) -> pd.DataFrame:
103    """
104    Standardize column names to snake case.
105
106    Parameters
107    ----------
108    data : pd.DataFrame
109        The input DataFrame.
110
111    Returns
112    -------
113    pd.DataFrame
114        DataFrame with standardized column names.
115    """
116    data.columns = [_convert_camel_case_to_snake_case(col) for col in data.columns]
117    return data

Standardize column names to snake case.

Parameters
  • data (pd.DataFrame): The input DataFrame.
Returns
  • pd.DataFrame: DataFrame with standardized column names.
def remove_duplicates( data: pandas.core.frame.DataFrame, subset: list[str] | None = None, keep: Literal['first', 'last', False] = 'first') -> pandas.core.frame.DataFrame:
11def remove_duplicates(
12    data: pd.DataFrame,
13    subset: list[str] | None = None,
14    keep: Literal["first", "last", False] = "first",
15) -> pd.DataFrame:
16    """
17    Remove duplicate rows from the DataFrame.
18
19    Parameters
20    ----------
21    data : pd.DataFrame
22        The input DataFrame.
23    subset : Optional[List[str]]
24        Columns to consider for identifying duplicates. By default, considers all columns.
25    keep : str
26        Which duplicates to keep ('first', 'last', or False for dropping all).
27
28    Returns
29    -------
30    pd.DataFrame
31        DataFrame without duplicate rows.
32    """
33    return data.drop_duplicates(subset=subset, keep=keep)

Remove duplicate rows from the DataFrame.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • subset (Optional[List[str]]): Columns to consider for identifying duplicates. By default, considers all columns.
  • keep (str): Which duplicates to keep ('first', 'last', or False for dropping all).
Returns
  • pd.DataFrame: DataFrame without duplicate rows.
def scale_data( data: pandas.core.frame.DataFrame, columns: list[str], method: str | Callable[[pandas.core.frame.DataFrame, list[str]], pandas.core.frame.DataFrame] = 'minmax') -> pandas.core.frame.DataFrame:
28def scale_data(
29    data: pd.DataFrame,
30    columns: list[str],
31    method: str | Callable[[pd.DataFrame, list[str]], pd.DataFrame] = "minmax",
32) -> pd.DataFrame:
33    """
34    Scale numerical features using specified scaling method.
35
36    Parameters
37    ----------
38    data : pd.DataFrame
39        The input DataFrame.
40    columns : List[str]
41        Columns to scale.
42    method : Union[str, Callable[[pd.DataFrame, List[str]], pd.DataFrame]]
43        Scaling method ('minmax', 'zscore', or custom scaler function).
44
45    Returns
46    -------
47    pd.DataFrame
48        DataFrame with scaled columns.
49
50    Raises
51    ------
52    ImportError
53        If scikit-learn is not installed (required for built-in scalers).
54    """
55    _check_sklearn_available()
56    if isinstance(method, str):
57        if method == "minmax":
58            scaler = MinMaxScaler()
59        elif method == "zscore":
60            scaler = StandardScaler()
61        else:
62            raise ValueError(f"Invalid scaling method: {method}")
63    elif callable(method):
64        scaler = method
65    else:
66        raise TypeError("Invalid method type. Method must be a string or a callable function.")
67
68    data[columns] = scaler.fit_transform(data[columns])
69    return data

Scale numerical features using specified scaling method.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • columns (List[str]): Columns to scale.
  • method (Union[str, Callable[[pd.DataFrame, List[str]], pd.DataFrame]]): Scaling method ('minmax', 'zscore', or custom scaler function).
Returns
  • pd.DataFrame: DataFrame with scaled columns.
Raises
  • ImportError: If scikit-learn is not installed (required for built-in scalers).
def select_columns( data: pandas.core.frame.DataFrame, columns: list[str]) -> pandas.core.frame.DataFrame:
37def select_columns(data: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
38    """
39    Retain only specified columns in the DataFrame.
40
41    Parameters
42    ----------
43    data : pd.DataFrame
44        The input DataFrame.
45    columns : List[str]
46        Columns to retain.
47
48    Returns
49    -------
50    pd.DataFrame
51        DataFrame with only the selected columns.
52    """
53    return data[columns]

Retain only specified columns in the DataFrame.

Parameters
  • data (pd.DataFrame): The input DataFrame.
  • columns (List[str]): Columns to retain.
Returns
  • pd.DataFrame: DataFrame with only the selected columns.
def validate_is_dataframe(data: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
 5def validate_is_dataframe(
 6    data: pd.DataFrame,
 7) -> pd.DataFrame:
 8    """
 9    Validates if the input is a pandas DataFrame.
10
11    Args:
12        data (pd.DataFrame): The input data to validate.
13
14    Returns:
15        pd.DataFrame: The validated DataFrame if the input is valid.
16
17    Raises:
18        ValueError: If the input is not a pandas DataFrame.
19    """
20    if not isinstance(data, pd.DataFrame):
21        raise ValueError("Input data is not a pandas DataFrame.")
22
23    return data

Validates if the input is a pandas DataFrame.

Args: data (pd.DataFrame): The input data to validate.

Returns: pd.DataFrame: The validated DataFrame if the input is valid.

Raises: ValueError: If the input is not a pandas DataFrame.