adc_toolkit.processing.steps
Prebuilt transformation functions for data processing pipelines.
This module provides a library of ready-to-use transformation functions
designed to work seamlessly with ~adc_toolkit.processing.ProcessingPipeline.
These functions handle common data processing tasks such as cleaning, filtering,
transforming, and combining data.
Available Submodules
pandas Transformation functions for pandas DataFrames. Includes functions for removing duplicates, filling missing values, scaling data, encoding categorical variables, and more.
All step functions follow the standard contract: they accept a Data object
(e.g., pandas DataFrame) as the first positional argument and return a Data
object. Additional parameters are passed as keyword arguments.
Examples
Using prebuilt steps with a pipeline:
>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import (
... remove_duplicates,
... fill_missing_values,
... scale_data,
... )
>>>
>>> pipeline = (
... ProcessingPipeline()
... .add(remove_duplicates, subset=["id"])
... .add(fill_missing_values, method="mean", columns=["value"])
... .add(scale_data, columns=["value"], method="minmax")
... )
>>> result = pipeline.run(raw_data)
Importing functions directly:
>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
>>> clean_df = remove_duplicates(df, subset=["customer_id"])
Using the convenience re-exports:
>>> from adc_toolkit.processing import steps
>>> clean_df = steps.remove_duplicates(df, subset=["id"])
See Also
adc_toolkit.processing.ProcessingPipeline: Pipeline for chaining steps.
adc_toolkit.processing.steps.pandas: Pandas-specific step functions.
Notes
The steps module is designed for extensibility. Future versions may include additional submodules for other data types (e.g., PySpark DataFrames, Polars).
Each step function is a standalone, pure function that can be used independently of the pipeline framework. This makes them easy to test, compose, and reuse.
1""" 2Prebuilt transformation functions for data processing pipelines. 3 4This module provides a library of ready-to-use transformation functions 5designed to work seamlessly with :class:`~adc_toolkit.processing.ProcessingPipeline`. 6These functions handle common data processing tasks such as cleaning, filtering, 7transforming, and combining data. 8 9Available Submodules 10-------------------- 11pandas 12 Transformation functions for pandas DataFrames. Includes functions for 13 removing duplicates, filling missing values, scaling data, encoding 14 categorical variables, and more. 15 16All step functions follow the standard contract: they accept a ``Data`` object 17(e.g., pandas DataFrame) as the first positional argument and return a ``Data`` 18object. Additional parameters are passed as keyword arguments. 19 20Examples 21-------- 22**Using prebuilt steps with a pipeline:** 23 24>>> from adc_toolkit.processing import ProcessingPipeline 25>>> from adc_toolkit.processing.steps.pandas import ( 26... remove_duplicates, 27... fill_missing_values, 28... scale_data, 29... ) 30>>> 31>>> pipeline = ( 32... ProcessingPipeline() 33... .add(remove_duplicates, subset=["id"]) 34... .add(fill_missing_values, method="mean", columns=["value"]) 35... .add(scale_data, columns=["value"], method="minmax") 36... ) 37>>> result = pipeline.run(raw_data) 38 39**Importing functions directly:** 40 41>>> from adc_toolkit.processing.steps.pandas import remove_duplicates 42>>> clean_df = remove_duplicates(df, subset=["customer_id"]) 43 44**Using the convenience re-exports:** 45 46>>> from adc_toolkit.processing import steps 47>>> clean_df = steps.remove_duplicates(df, subset=["id"]) 48 49See Also 50-------- 51adc_toolkit.processing.ProcessingPipeline : Pipeline for chaining steps. 52adc_toolkit.processing.steps.pandas : Pandas-specific step functions. 53 54Notes 55----- 56The steps module is designed for extensibility. Future versions may include 57additional submodules for other data types (e.g., PySpark DataFrames, Polars). 58 59Each step function is a standalone, pure function that can be used independently 60of the pipeline framework. This makes them easy to test, compose, and reuse. 61""" 62 63# Re-export commonly used functions for convenience 64from .pandas import ( 65 divide_one_column_by_another, 66 encode_categorical, 67 fill_missing_values, 68 filter_rows, 69 group_and_aggregate, 70 make_columns_snake_case, 71 remove_duplicates, 72 scale_data, 73 select_columns, 74 validate_is_dataframe, 75) 76 77 78__all__ = [ 79 "divide_one_column_by_another", 80 "encode_categorical", 81 "fill_missing_values", 82 "filter_rows", 83 "group_and_aggregate", 84 "make_columns_snake_case", 85 "pandas", 86 "remove_duplicates", 87 "scale_data", 88 "select_columns", 89 "validate_is_dataframe", 90]
107def divide_one_column_by_another( 108 data: pd.DataFrame, numerator: str, denominator: str, new_column_name: str 109) -> pd.DataFrame: 110 """ 111 112 Parameters 113 114 ---------- 115 data : pd.DataFrame 116 The input DataFrame containing the data to be transformed. 117 numerator : str 118 The name of the column to be used as the numerator. 119 denominator : str 120 The name of the column to be used as the denominator. 121 new_column_name : str 122 The name of the new column to be created. 123 Returns 124 ------- 125 pd.DataFrame 126 The transformed DataFrame with the new column added. 127 """ 128 data[new_column_name] = data[numerator] / data[denominator] 129 return data
Parameters
data : pd.DataFrame The input DataFrame containing the data to be transformed. numerator : str The name of the column to be used as the numerator. denominator : str The name of the column to be used as the denominator. new_column_name : str The name of the new column to be created.
Returns
- pd.DataFrame: The transformed DataFrame with the new column added.
72def encode_categorical(data: pd.DataFrame, columns: list[str], method: str = "onehot") -> pd.DataFrame: 73 """ 74 Encode categorical features using the specified encoding method. 75 76 Parameters 77 ---------- 78 data : pd.DataFrame 79 The input DataFrame. 80 columns : List[str] 81 Columns to encode. 82 method : str 83 Encoding method ('onehot' or 'label'). 84 85 Returns 86 ------- 87 pd.DataFrame 88 DataFrame with encoded columns. 89 90 Raises 91 ------ 92 ImportError 93 If scikit-learn is not installed and method is 'label'. 94 """ 95 if method == "onehot": 96 return pd.get_dummies(data, columns=columns) 97 elif method == "label": 98 _check_sklearn_available() 99 encoder = LabelEncoder() 100 for col in columns: 101 data[col] = encoder.fit_transform(data[col]) 102 return data 103 else: 104 raise ValueError(f"Invalid encoding method: {method}")
Encode categorical features using the specified encoding method.
Parameters
- data (pd.DataFrame): The input DataFrame.
- columns (List[str]): Columns to encode.
- method (str): Encoding method ('onehot' or 'label').
Returns
- pd.DataFrame: DataFrame with encoded columns.
Raises
- ImportError: If scikit-learn is not installed and method is 'label'.
44def fill_missing_values( 45 data: pd.DataFrame, 46 method: str = FillMethod.MEAN.value, 47 value: Any = None, 48 columns: list[str] | None = None, 49) -> pd.DataFrame: 50 """ 51 Fill or interpolate missing values in the DataFrame. 52 53 Parameters 54 ---------- 55 data : pd.DataFrame 56 The input DataFrame. 57 method : str 58 The method to fill missing values ("mean", "median", "mode", "constant" or "interpolate"). 59 value : Any 60 Specific value to use for filling if `method="constant"`. 61 columns : Optional[List[str]] 62 List of columns to apply the filling method to. If None, applies to all columns. 63 64 Returns 65 ------- 66 pd.DataFrame 67 DataFrame with missing values filled. 68 """ 69 try: 70 fill_method = FillMethod(method) 71 except ValueError as e: 72 raise ValueError(f"Invalid method: {method}") from e 73 74 if columns is None: 75 columns = list(data.columns) 76 77 missing_columns = [col for col in columns if col not in data.columns] 78 if missing_columns: 79 raise ValueError( 80 f"The following columns are not in the DataFrame: {missing_columns}. Available columns: {data.columns}" 81 ) 82 83 data = data.copy() 84 if fill_method == FillMethod.MEAN: 85 data[columns] = data[columns].fillna(data[columns].mean()) 86 elif fill_method == FillMethod.MEDIAN: 87 data[columns] = data[columns].fillna(data[columns].median()) 88 elif fill_method == FillMethod.MODE: 89 data[columns] = data[columns].apply(lambda col: col.fillna(col.mode().iloc[0])) 90 elif fill_method == FillMethod.CONSTANT and value is not None: 91 data[columns] = data[columns].fillna(value) 92 elif fill_method == FillMethod.INTERPOLATE: 93 data[columns] = data[columns].interpolate() 94 95 return data
Fill or interpolate missing values in the DataFrame.
Parameters
- data (pd.DataFrame): The input DataFrame.
- method (str): The method to fill missing values ("mean", "median", "mode", "constant" or "interpolate").
- value (Any):
Specific value to use for filling if
method="constant". - columns (Optional[List[str]]): List of columns to apply the filling method to. If None, applies to all columns.
Returns
- pd.DataFrame: DataFrame with missing values filled.
9def filter_rows(data: pd.DataFrame, condition: Callable[[pd.DataFrame], pd.Series]) -> pd.DataFrame: 10 """ 11 Filter rows based on a condition. 12 13 Parameters 14 ---------- 15 data : pd.DataFrame 16 The input DataFrame. 17 condition : Callable[[pd.DataFrame], pd.Series] 18 A function that returns a boolean Series indicating rows to keep. 19 20 Returns 21 ------- 22 pd.DataFrame 23 Filtered DataFrame. 24 25 Example 26 -------- 27 >>> data = pd.DataFrame({"A": [1, 2, 3, 4], "B": ["a", "b", "c", "d"]}) 28 >>> condition = lambda df: df["A"] > 2 29 >>> filter_rows(data, condition) 30 A B 31 2 3 c 32 3 4 d 33 """ 34 return data[condition(data)]
Filter rows based on a condition.
Parameters
- data (pd.DataFrame): The input DataFrame.
- condition (Callable[[pd.DataFrame], pd.Series]): A function that returns a boolean Series indicating rows to keep.
Returns
- pd.DataFrame: Filtered DataFrame.
Example
>>> data = pd.DataFrame({"A": [1, 2, 3, 4], "B": ["a", "b", "c", "d"]})
>>> condition = lambda df: df["A"] > 2
>>> filter_rows(data, condition)
A B
2 3 c
3 4 d
9def group_and_aggregate( 10 data: pd.DataFrame, group_by_columns: list[str], agg_funcs: dict[str, Callable] 11) -> pd.DataFrame: 12 """ 13 Group data by specified columns and apply aggregation functions. 14 15 Parameters 16 ---------- 17 data : pd.DataFrame 18 The input DataFrame. 19 group_by_columns : List[str] 20 Columns to group by. 21 agg_funcs : Dict[str, Callable] 22 Dictionary mapping column names to aggregation functions. 23 24 Returns 25 ------- 26 pd.DataFrame 27 Aggregated DataFrame. 28 """ 29 return data.groupby(group_by_columns).agg(agg_funcs).reset_index()
Group data by specified columns and apply aggregation functions.
Parameters
- data (pd.DataFrame): The input DataFrame.
- group_by_columns (List[str]): Columns to group by.
- agg_funcs (Dict[str, Callable]): Dictionary mapping column names to aggregation functions.
Returns
- pd.DataFrame: Aggregated DataFrame.
102def make_columns_snake_case(data: pd.DataFrame) -> pd.DataFrame: 103 """ 104 Standardize column names to snake case. 105 106 Parameters 107 ---------- 108 data : pd.DataFrame 109 The input DataFrame. 110 111 Returns 112 ------- 113 pd.DataFrame 114 DataFrame with standardized column names. 115 """ 116 data.columns = [_convert_camel_case_to_snake_case(col) for col in data.columns] 117 return data
Standardize column names to snake case.
Parameters
- data (pd.DataFrame): The input DataFrame.
Returns
- pd.DataFrame: DataFrame with standardized column names.
11def remove_duplicates( 12 data: pd.DataFrame, 13 subset: list[str] | None = None, 14 keep: Literal["first", "last", False] = "first", 15) -> pd.DataFrame: 16 """ 17 Remove duplicate rows from the DataFrame. 18 19 Parameters 20 ---------- 21 data : pd.DataFrame 22 The input DataFrame. 23 subset : Optional[List[str]] 24 Columns to consider for identifying duplicates. By default, considers all columns. 25 keep : str 26 Which duplicates to keep ('first', 'last', or False for dropping all). 27 28 Returns 29 ------- 30 pd.DataFrame 31 DataFrame without duplicate rows. 32 """ 33 return data.drop_duplicates(subset=subset, keep=keep)
Remove duplicate rows from the DataFrame.
Parameters
- data (pd.DataFrame): The input DataFrame.
- subset (Optional[List[str]]): Columns to consider for identifying duplicates. By default, considers all columns.
- keep (str): Which duplicates to keep ('first', 'last', or False for dropping all).
Returns
- pd.DataFrame: DataFrame without duplicate rows.
28def scale_data( 29 data: pd.DataFrame, 30 columns: list[str], 31 method: str | Callable[[pd.DataFrame, list[str]], pd.DataFrame] = "minmax", 32) -> pd.DataFrame: 33 """ 34 Scale numerical features using specified scaling method. 35 36 Parameters 37 ---------- 38 data : pd.DataFrame 39 The input DataFrame. 40 columns : List[str] 41 Columns to scale. 42 method : Union[str, Callable[[pd.DataFrame, List[str]], pd.DataFrame]] 43 Scaling method ('minmax', 'zscore', or custom scaler function). 44 45 Returns 46 ------- 47 pd.DataFrame 48 DataFrame with scaled columns. 49 50 Raises 51 ------ 52 ImportError 53 If scikit-learn is not installed (required for built-in scalers). 54 """ 55 _check_sklearn_available() 56 if isinstance(method, str): 57 if method == "minmax": 58 scaler = MinMaxScaler() 59 elif method == "zscore": 60 scaler = StandardScaler() 61 else: 62 raise ValueError(f"Invalid scaling method: {method}") 63 elif callable(method): 64 scaler = method 65 else: 66 raise TypeError("Invalid method type. Method must be a string or a callable function.") 67 68 data[columns] = scaler.fit_transform(data[columns]) 69 return data
Scale numerical features using specified scaling method.
Parameters
- data (pd.DataFrame): The input DataFrame.
- columns (List[str]): Columns to scale.
- method (Union[str, Callable[[pd.DataFrame, List[str]], pd.DataFrame]]): Scaling method ('minmax', 'zscore', or custom scaler function).
Returns
- pd.DataFrame: DataFrame with scaled columns.
Raises
- ImportError: If scikit-learn is not installed (required for built-in scalers).
37def select_columns(data: pd.DataFrame, columns: list[str]) -> pd.DataFrame: 38 """ 39 Retain only specified columns in the DataFrame. 40 41 Parameters 42 ---------- 43 data : pd.DataFrame 44 The input DataFrame. 45 columns : List[str] 46 Columns to retain. 47 48 Returns 49 ------- 50 pd.DataFrame 51 DataFrame with only the selected columns. 52 """ 53 return data[columns]
Retain only specified columns in the DataFrame.
Parameters
- data (pd.DataFrame): The input DataFrame.
- columns (List[str]): Columns to retain.
Returns
- pd.DataFrame: DataFrame with only the selected columns.
5def validate_is_dataframe( 6 data: pd.DataFrame, 7) -> pd.DataFrame: 8 """ 9 Validates if the input is a pandas DataFrame. 10 11 Args: 12 data (pd.DataFrame): The input data to validate. 13 14 Returns: 15 pd.DataFrame: The validated DataFrame if the input is valid. 16 17 Raises: 18 ValueError: If the input is not a pandas DataFrame. 19 """ 20 if not isinstance(data, pd.DataFrame): 21 raise ValueError("Input data is not a pandas DataFrame.") 22 23 return data
Validates if the input is a pandas DataFrame.
Args: data (pd.DataFrame): The input data to validate.
Returns: pd.DataFrame: The validated DataFrame if the input is valid.
Raises: ValueError: If the input is not a pandas DataFrame.