adc_toolkit.processing
Data processing pipeline framework for composable, chainable transformations.
This module provides a flexible pipeline framework for processing data through
a sequence of transformation steps. The framework is designed to be agnostic
to the underlying data type, working with any object that satisfies the
~adc_toolkit.data.abs.Data protocol (e.g., pandas DataFrames,
PySpark DataFrames).
Key Components
ProcessingPipeline Orchestrates the execution of multiple processing steps in sequence. Supports method chaining for fluent pipeline construction. PipelineStep Wraps individual transformation functions with their parameters. Handles execution and provides debugging information.
The Function Contract
The framework's flexibility comes from a simple contract: any function that
accepts a Data object as its first positional argument and returns a
Data object can be used as a pipeline step. This means you can use:
- Prebuilt step functions from
adc_toolkit.processing.steps - pandas/PySpark functions directly (e.g.,
pd.merge,pd.concat) - Lambda functions for simple inline transformations
- Custom functions you write for your specific needs
The function signature should follow this pattern:
def my_step(data: Data, param1: type1, param2: type2, ...) -> Data:
# Transform data
return transformed_data
Where Data is any object with columns and dtypes properties
(pandas DataFrames and PySpark DataFrames satisfy this naturally).
Examples
Using prebuilt step functions:
>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import remove_duplicates, fill_missing_values
>>>
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["id"])
>>> pipeline.add(fill_missing_values, method="mean", columns=["value"])
>>> result = pipeline.run(raw_data)
Using pandas functions directly:
Many pandas functions accept a DataFrame as the first argument and return a DataFrame, making them directly usable as pipeline steps:
>>> import pandas as pd
>>>
>>> # Merge with another DataFrame
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
>>> result = pipeline.run(main_df)
>>>
>>> # Concatenate DataFrames
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.concat, objs=[df2, df3], ignore_index=True)
>>> result = pipeline.run(df1) # df1 is passed as first positional arg
Using custom transformation functions:
>>> def normalize_by_group(
... data: pd.DataFrame,
... value_col: str,
... group_col: str,
... ) -> pd.DataFrame:
... '''Normalize values within each group to [0, 1] range.'''
... result = data.copy()
... for group in data[group_col].unique():
... mask = data[group_col] == group
... values = data.loc[mask, value_col]
... min_val, max_val = values.min(), values.max()
... if max_val > min_val:
... result.loc[mask, value_col] = (values - min_val) / (max_val - min_val)
... return result
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(normalize_by_group, value_col="price", group_col="category")
>>> result = pipeline.run(sales_data)
Using lambda functions for simple transformations:
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(lambda df: df.dropna())
>>> pipeline.add(lambda df: df.reset_index(drop=True))
>>> result = pipeline.run(messy_data)
Method chaining for fluent construction:
>>> pipeline = (
... ProcessingPipeline()
... .add(remove_duplicates)
... .add(fill_missing_values, method="median")
... .add(lambda df: df.reset_index(drop=True))
... )
>>> result = pipeline.run(data)
See Also
adc_toolkit.processing.pipeline.ProcessingPipeline: The pipeline orchestrator.
adc_toolkit.processing.step.PipelineStep: The step wrapper class.
adc_toolkit.processing.steps: Prebuilt transformation functions.
adc_toolkit.data.abs.Data: The protocol defining compatible data objects.
Notes
The pipeline creates a deep copy of input data before processing, ensuring the original data remains unmodified. This is important for reproducibility but may have performance implications for very large datasets.
For production workloads with large data, consider:
- Using PySpark DataFrames which handle distributed processing
- Processing data in chunks if memory is a constraint
- Using in-place operations in custom step functions when appropriate
1""" 2Data processing pipeline framework for composable, chainable transformations. 3 4This module provides a flexible pipeline framework for processing data through 5a sequence of transformation steps. The framework is designed to be agnostic 6to the underlying data type, working with any object that satisfies the 7:class:`~adc_toolkit.data.abs.Data` protocol (e.g., pandas DataFrames, 8PySpark DataFrames). 9 10Key Components 11-------------- 12ProcessingPipeline 13 Orchestrates the execution of multiple processing steps in sequence. 14 Supports method chaining for fluent pipeline construction. 15PipelineStep 16 Wraps individual transformation functions with their parameters. 17 Handles execution and provides debugging information. 18 19The Function Contract 20--------------------- 21The framework's flexibility comes from a simple contract: any function that 22accepts a ``Data`` object as its first positional argument and returns a 23``Data`` object can be used as a pipeline step. This means you can use: 24 251. **Prebuilt step functions** from :mod:`adc_toolkit.processing.steps` 262. **pandas/PySpark functions directly** (e.g., ``pd.merge``, ``pd.concat``) 273. **Lambda functions** for simple inline transformations 284. **Custom functions** you write for your specific needs 29 30The function signature should follow this pattern: 31 32 def my_step(data: Data, param1: type1, param2: type2, ...) -> Data: 33 # Transform data 34 return transformed_data 35 36Where ``Data`` is any object with ``columns`` and ``dtypes`` properties 37(pandas DataFrames and PySpark DataFrames satisfy this naturally). 38 39Examples 40-------- 41**Using prebuilt step functions:** 42 43>>> from adc_toolkit.processing import ProcessingPipeline 44>>> from adc_toolkit.processing.steps.pandas import remove_duplicates, fill_missing_values 45>>> 46>>> pipeline = ProcessingPipeline() 47>>> pipeline.add(remove_duplicates, subset=["id"]) 48>>> pipeline.add(fill_missing_values, method="mean", columns=["value"]) 49>>> result = pipeline.run(raw_data) 50 51**Using pandas functions directly:** 52 53Many pandas functions accept a DataFrame as the first argument and return 54a DataFrame, making them directly usable as pipeline steps: 55 56>>> import pandas as pd 57>>> 58>>> # Merge with another DataFrame 59>>> pipeline = ProcessingPipeline() 60>>> pipeline.add(pd.merge, right=reference_df, how="left", on="id") 61>>> result = pipeline.run(main_df) 62>>> 63>>> # Concatenate DataFrames 64>>> pipeline = ProcessingPipeline() 65>>> pipeline.add(pd.concat, objs=[df2, df3], ignore_index=True) 66>>> result = pipeline.run(df1) # df1 is passed as first positional arg 67 68**Using custom transformation functions:** 69 70>>> def normalize_by_group( 71... data: pd.DataFrame, 72... value_col: str, 73... group_col: str, 74... ) -> pd.DataFrame: 75... '''Normalize values within each group to [0, 1] range.''' 76... result = data.copy() 77... for group in data[group_col].unique(): 78... mask = data[group_col] == group 79... values = data.loc[mask, value_col] 80... min_val, max_val = values.min(), values.max() 81... if max_val > min_val: 82... result.loc[mask, value_col] = (values - min_val) / (max_val - min_val) 83... return result 84>>> pipeline = ProcessingPipeline() 85>>> pipeline.add(normalize_by_group, value_col="price", group_col="category") 86>>> result = pipeline.run(sales_data) 87 88**Using lambda functions for simple transformations:** 89 90>>> pipeline = ProcessingPipeline() 91>>> pipeline.add(lambda df: df.dropna()) 92>>> pipeline.add(lambda df: df.reset_index(drop=True)) 93>>> result = pipeline.run(messy_data) 94 95**Method chaining for fluent construction:** 96 97>>> pipeline = ( 98... ProcessingPipeline() 99... .add(remove_duplicates) 100... .add(fill_missing_values, method="median") 101... .add(lambda df: df.reset_index(drop=True)) 102... ) 103>>> result = pipeline.run(data) 104 105See Also 106-------- 107adc_toolkit.processing.pipeline.ProcessingPipeline : The pipeline orchestrator. 108adc_toolkit.processing.step.PipelineStep : The step wrapper class. 109adc_toolkit.processing.steps : Prebuilt transformation functions. 110adc_toolkit.data.abs.Data : The protocol defining compatible data objects. 111 112Notes 113----- 114The pipeline creates a deep copy of input data before processing, ensuring 115the original data remains unmodified. This is important for reproducibility 116but may have performance implications for very large datasets. 117 118For production workloads with large data, consider: 119- Using PySpark DataFrames which handle distributed processing 120- Processing data in chunks if memory is a constraint 121- Using in-place operations in custom step functions when appropriate 122""" 123 124from . import steps 125from .pipeline import ProcessingPipeline 126from .step import PipelineStep 127 128 129__all__ = ["PipelineStep", "ProcessingPipeline", "steps"]
76class PipelineStep: 77 """ 78 A wrapper for transformation functions in a data processing pipeline. 79 80 ``PipelineStep`` encapsulates a transformation function along with its 81 keyword arguments, creating a reusable, inspectable unit of data 82 transformation. It handles the invocation of the wrapped function with 83 the stored arguments during pipeline execution. 84 85 This class is typically not instantiated directly by users. Instead, 86 steps are created implicitly when calling 87 :meth:`ProcessingPipeline.add() <adc_toolkit.processing.ProcessingPipeline.add>`. 88 However, understanding ``PipelineStep`` is useful for debugging and 89 creating custom pipeline behaviors. 90 91 Parameters 92 ---------- 93 step : Callable[..., Data] 94 The transformation function to wrap. Must accept a ``Data`` object 95 as its first positional argument and return a ``Data`` object. 96 **kwargs : Any 97 Keyword arguments to pass to the function during execution. These 98 are stored and applied every time :meth:`execute` is called. 99 100 Attributes 101 ---------- 102 step : Callable[..., Data] 103 The wrapped transformation function. 104 kwargs : dict[str, Any] 105 Dictionary of keyword arguments to pass to the function. 106 107 See Also 108 -------- 109 ProcessingPipeline : Uses ``PipelineStep`` to orchestrate transformations. 110 adc_toolkit.processing.steps.pandas : Prebuilt step functions for pandas. 111 112 Notes 113 ----- 114 **The Step Function Contract** 115 116 Any function can be wrapped in a ``PipelineStep`` if it follows this 117 pattern: 118 119 def transformation(data: Data, **kwargs) -> Data: 120 # Perform transformation 121 return transformed_data 122 123 Where: 124 125 - ``Data`` is any object satisfying the ``Data`` protocol (has ``columns`` 126 and ``dtypes`` properties) 127 - ``**kwargs`` represents any additional parameters your function needs 128 - The return value must also satisfy the ``Data`` protocol 129 130 **Flexibility** 131 132 This design enables wrapping: 133 134 - **Prebuilt functions** from :mod:`adc_toolkit.processing.steps` 135 - **pandas built-in methods** wrapped as functions 136 - **PySpark transformations** wrapped as functions 137 - **Custom domain-specific** transformation logic 138 139 **Debugging** 140 141 The :meth:`__str__` method provides a human-readable representation 142 showing the function name and its bound arguments, useful for 143 debugging and logging pipeline structure. 144 145 Examples 146 -------- 147 **Creating a step with a custom function:** 148 149 >>> import pandas as pd 150 >>> from adc_toolkit.processing.step import PipelineStep 151 >>> 152 >>> def normalize(data: pd.DataFrame, column: str) -> pd.DataFrame: 153 ... result = data.copy() 154 ... col_data = result[column] 155 ... result[column] = (col_data - col_data.min()) / (col_data.max() - col_data.min()) 156 ... return result 157 >>> step = PipelineStep(normalize, column="temperature") 158 >>> print(step) 159 normalize(column=temperature) 160 161 **Creating a step with a prebuilt function:** 162 163 >>> from adc_toolkit.processing.steps.pandas import fill_missing_values 164 >>> 165 >>> step = PipelineStep(fill_missing_values, method="mean", columns=["value"]) 166 >>> print(step) 167 fill_missing_values(method=mean, columns=['value']) 168 169 **Executing a step:** 170 171 >>> df = pd.DataFrame({"temperature": [0, 50, 100]}) 172 >>> step = PipelineStep(normalize, column="temperature") 173 >>> result = step.execute(df) 174 >>> result["temperature"].tolist() 175 [0.0, 0.5, 1.0] 176 177 **Wrapping a pandas operation:** 178 179 >>> def drop_na(data: pd.DataFrame, subset: list[str] | None = None) -> pd.DataFrame: 180 ... return data.dropna(subset=subset) 181 >>> step = PipelineStep(drop_na, subset=["critical_column"]) 182 >>> step.execute(df_with_nulls) 183 """ 184 185 def __init__( 186 self, 187 step: Callable[..., Data], 188 **kwargs: Any, 189 ) -> None: 190 """ 191 Initialize a pipeline step with a function and its arguments. 192 193 Creates a new step by storing the transformation function and any 194 keyword arguments that should be passed to it during execution. 195 196 Parameters 197 ---------- 198 step : Callable[..., Data] 199 The transformation function to wrap. This function must: 200 201 - Accept a ``Data`` object as its first positional argument 202 - Return a ``Data`` object 203 - Accept any additional parameters as keyword arguments 204 205 The function can be a prebuilt step, a pandas/PySpark method 206 wrapped as a function, or a custom transformation. 207 208 **kwargs : Any 209 Keyword arguments to pass to the step function when 210 :meth:`execute` is called. These should match the function's 211 parameter names (excluding the first ``data`` parameter). 212 213 See Also 214 -------- 215 execute : Run the step on data. 216 217 Notes 218 ----- 219 The keyword arguments are stored as-is in the ``kwargs`` attribute. 220 They are not validated at initialization time; validation happens 221 when the function is called during :meth:`execute`. This allows 222 for flexible argument passing but means errors in argument names 223 or types will only surface at execution time. 224 225 Examples 226 -------- 227 **Basic initialization:** 228 229 >>> def add_suffix(data, column: str, suffix: str) -> pd.DataFrame: 230 ... result = data.copy() 231 ... result[column] = result[column].astype(str) + suffix 232 ... return result 233 >>> step = PipelineStep(add_suffix, column="name", suffix="_processed") 234 >>> step.kwargs 235 {'column': 'name', 'suffix': '_processed'} 236 237 **With no additional arguments:** 238 239 >>> def identity(data): 240 ... return data.copy() 241 >>> step = PipelineStep(identity) 242 >>> step.kwargs 243 {} 244 """ 245 self.step = step 246 self.kwargs = kwargs 247 248 def __str__(self) -> str: 249 """ 250 Return a human-readable string representation of the step. 251 252 The string includes the function name and a comma-separated list 253 of the keyword arguments in ``key=value`` format. This is useful 254 for debugging, logging, and understanding pipeline structure. 255 256 Returns 257 ------- 258 str 259 A string in the format ``"function_name(arg1=val1, arg2=val2)"``. 260 If there are no keyword arguments, returns ``"function_name()"``. 261 262 Examples 263 -------- 264 **Step with arguments:** 265 266 >>> from adc_toolkit.processing.steps.pandas import fill_missing_values 267 >>> step = PipelineStep(fill_missing_values, method="mean", columns=["a", "b"]) 268 >>> str(step) 269 "fill_missing_values(method=mean, columns=['a', 'b'])" 270 271 **Step without arguments:** 272 273 >>> def simple_transform(data): 274 ... return data 275 >>> step = PipelineStep(simple_transform) 276 >>> str(step) 277 'simple_transform()' 278 279 **Used in pipeline string representation:** 280 281 >>> from adc_toolkit.processing import ProcessingPipeline 282 >>> from adc_toolkit.processing.steps.pandas import remove_duplicates 283 >>> pipeline = ProcessingPipeline() 284 >>> pipeline.add(remove_duplicates, subset=["id"]) 285 >>> print(pipeline) # Uses PipelineStep.__str__ internally 286 remove_duplicates(subset=['id']) 287 """ 288 kwargs_strings = [f"{key}={value}" for key, value in self.kwargs.items()] 289 return f"{self.step.__name__}({', '.join(kwargs_strings)})" 290 291 def execute(self, data: Data) -> Data: 292 """ 293 Execute the wrapped transformation function on the provided data. 294 295 Calls the stored function with the input data as the first argument 296 and the stored keyword arguments. Returns the transformed data. 297 298 Parameters 299 ---------- 300 data : Data 301 The input data to transform. Must be a ``Data`` protocol-compatible 302 object (e.g., pandas DataFrame, PySpark DataFrame). The object 303 should be compatible with the wrapped function's expectations. 304 305 Returns 306 ------- 307 Data 308 The transformed data returned by the wrapped function. The exact 309 type depends on what the wrapped function returns, but it should 310 satisfy the ``Data`` protocol. 311 312 Raises 313 ------ 314 TypeError 315 If the function cannot accept the provided data type, or if 316 required keyword arguments are missing. 317 ValueError 318 If the function raises a ``ValueError`` due to invalid input 319 data or argument values (e.g., referencing a column that 320 doesn't exist). 321 KeyError 322 If the function attempts to access non-existent columns or keys. 323 Exception 324 Any other exception raised by the wrapped function propagates 325 unchanged. The exception type and message depend on the specific 326 function implementation. 327 328 See Also 329 -------- 330 ProcessingPipeline.run : Executes multiple steps in sequence. 331 332 Notes 333 ----- 334 **Execution Flow** 335 336 The execution is straightforward: 337 338 result = self.step(data, **self.kwargs) 339 340 The wrapped function receives the data as its first positional 341 argument and all stored keyword arguments are unpacked and passed. 342 343 **No Validation** 344 345 This method does not validate the input data or the function's 346 return value against the ``Data`` protocol. It trusts that: 347 348 1. The input satisfies the wrapped function's requirements 349 2. The function returns a valid ``Data`` object 350 351 Validation failures will surface as exceptions from the wrapped 352 function. 353 354 **Error Handling** 355 356 Exceptions from the wrapped function propagate directly to the 357 caller. When used within a :class:`ProcessingPipeline`, this means 358 the pipeline stops on the first error. Consider wrapping sensitive 359 operations with try-except in your step functions if you need 360 custom error handling. 361 362 Examples 363 -------- 364 **Basic execution:** 365 366 >>> import pandas as pd 367 >>> from adc_toolkit.processing.step import PipelineStep 368 >>> 369 >>> def double_values(data: pd.DataFrame, column: str) -> pd.DataFrame: 370 ... result = data.copy() 371 ... result[column] = result[column] * 2 372 ... return result 373 >>> step = PipelineStep(double_values, column="amount") 374 >>> df = pd.DataFrame({"amount": [10, 20, 30]}) 375 >>> result = step.execute(df) 376 >>> result["amount"].tolist() 377 [20, 40, 60] 378 379 **Handling errors:** 380 381 >>> def strict_transform(data: pd.DataFrame, required_col: str) -> pd.DataFrame: 382 ... if required_col not in data.columns: 383 ... raise ValueError(f"Column '{required_col}' not found") 384 ... return data 385 >>> step = PipelineStep(strict_transform, required_col="missing_column") 386 >>> try: 387 ... step.execute(df) 388 ... except ValueError as e: 389 ... print(f"Step failed: {e}") 390 Step failed: Column 'missing_column' not found 391 """ 392 result = self.step(data, **self.kwargs) 393 394 return result
A wrapper for transformation functions in a data processing pipeline.
PipelineStep encapsulates a transformation function along with its
keyword arguments, creating a reusable, inspectable unit of data
transformation. It handles the invocation of the wrapped function with
the stored arguments during pipeline execution.
This class is typically not instantiated directly by users. Instead,
steps are created implicitly when calling
ProcessingPipeline.add() <adc_toolkit.processing.ProcessingPipeline.add>().
However, understanding PipelineStep is useful for debugging and
creating custom pipeline behaviors.
Parameters
- step (Callable[..., Data]):
The transformation function to wrap. Must accept a
Dataobject as its first positional argument and return aDataobject. - **kwargs (Any):
Keyword arguments to pass to the function during execution. These
are stored and applied every time
execute()is called.
Attributes
- step (Callable[..., Data]): The wrapped transformation function.
- kwargs (dict[str, Any]): Dictionary of keyword arguments to pass to the function.
See Also
ProcessingPipeline: Uses PipelineStep to orchestrate transformations.
adc_toolkit.processing.steps.pandas: Prebuilt step functions for pandas.
Notes
The Step Function Contract
Any function can be wrapped in a PipelineStep if it follows this
pattern:
def transformation(data: Data, **kwargs) -> Data:
# Perform transformation
return transformed_data
Where:
Datais any object satisfying theDataprotocol (hascolumnsanddtypesproperties)**kwargsrepresents any additional parameters your function needs- The return value must also satisfy the
Dataprotocol
Flexibility
This design enables wrapping:
- Prebuilt functions from
adc_toolkit.processing.steps - pandas built-in methods wrapped as functions
- PySpark transformations wrapped as functions
- Custom domain-specific transformation logic
Debugging
The __str__() method provides a human-readable representation
showing the function name and its bound arguments, useful for
debugging and logging pipeline structure.
Examples
Creating a step with a custom function:
>>> import pandas as pd
>>> from adc_toolkit.processing.step import PipelineStep
>>>
>>> def normalize(data: pd.DataFrame, column: str) -> pd.DataFrame:
... result = data.copy()
... col_data = result[column]
... result[column] = (col_data - col_data.min()) / (col_data.max() - col_data.min())
... return result
>>> step = PipelineStep(normalize, column="temperature")
>>> print(step)
normalize(column=temperature)
Creating a step with a prebuilt function:
>>> from adc_toolkit.processing.steps.pandas import fill_missing_values
>>>
>>> step = PipelineStep(fill_missing_values, method="mean", columns=["value"])
>>> print(step)
fill_missing_values(method=mean, columns=['value'])
Executing a step:
>>> df = pd.DataFrame({"temperature": [0, 50, 100]})
>>> step = PipelineStep(normalize, column="temperature")
>>> result = step.execute(df)
>>> result["temperature"].tolist()
[0.0, 0.5, 1.0]
Wrapping a pandas operation:
>>> def drop_na(data: pd.DataFrame, subset: list[str] | None = None) -> pd.DataFrame:
... return data.dropna(subset=subset)
>>> step = PipelineStep(drop_na, subset=["critical_column"])
>>> step.execute(df_with_nulls)
185 def __init__( 186 self, 187 step: Callable[..., Data], 188 **kwargs: Any, 189 ) -> None: 190 """ 191 Initialize a pipeline step with a function and its arguments. 192 193 Creates a new step by storing the transformation function and any 194 keyword arguments that should be passed to it during execution. 195 196 Parameters 197 ---------- 198 step : Callable[..., Data] 199 The transformation function to wrap. This function must: 200 201 - Accept a ``Data`` object as its first positional argument 202 - Return a ``Data`` object 203 - Accept any additional parameters as keyword arguments 204 205 The function can be a prebuilt step, a pandas/PySpark method 206 wrapped as a function, or a custom transformation. 207 208 **kwargs : Any 209 Keyword arguments to pass to the step function when 210 :meth:`execute` is called. These should match the function's 211 parameter names (excluding the first ``data`` parameter). 212 213 See Also 214 -------- 215 execute : Run the step on data. 216 217 Notes 218 ----- 219 The keyword arguments are stored as-is in the ``kwargs`` attribute. 220 They are not validated at initialization time; validation happens 221 when the function is called during :meth:`execute`. This allows 222 for flexible argument passing but means errors in argument names 223 or types will only surface at execution time. 224 225 Examples 226 -------- 227 **Basic initialization:** 228 229 >>> def add_suffix(data, column: str, suffix: str) -> pd.DataFrame: 230 ... result = data.copy() 231 ... result[column] = result[column].astype(str) + suffix 232 ... return result 233 >>> step = PipelineStep(add_suffix, column="name", suffix="_processed") 234 >>> step.kwargs 235 {'column': 'name', 'suffix': '_processed'} 236 237 **With no additional arguments:** 238 239 >>> def identity(data): 240 ... return data.copy() 241 >>> step = PipelineStep(identity) 242 >>> step.kwargs 243 {} 244 """ 245 self.step = step 246 self.kwargs = kwargs
Initialize a pipeline step with a function and its arguments.
Creates a new step by storing the transformation function and any keyword arguments that should be passed to it during execution.
Parameters
step (Callable[..., Data]): The transformation function to wrap. This function must:
- Accept a
Dataobject as its first positional argument - Return a
Dataobject - Accept any additional parameters as keyword arguments
The function can be a prebuilt step, a pandas/PySpark method wrapped as a function, or a custom transformation.
- Accept a
- **kwargs (Any):
Keyword arguments to pass to the step function when
execute()is called. These should match the function's parameter names (excluding the firstdataparameter).
See Also
execute: Run the step on data.
Notes
The keyword arguments are stored as-is in the kwargs attribute.
They are not validated at initialization time; validation happens
when the function is called during execute(). This allows
for flexible argument passing but means errors in argument names
or types will only surface at execution time.
Examples
Basic initialization:
>>> def add_suffix(data, column: str, suffix: str) -> pd.DataFrame:
... result = data.copy()
... result[column] = result[column].astype(str) + suffix
... return result
>>> step = PipelineStep(add_suffix, column="name", suffix="_processed")
>>> step.kwargs
{'column': 'name', 'suffix': '_processed'}
With no additional arguments:
>>> def identity(data):
... return data.copy()
>>> step = PipelineStep(identity)
>>> step.kwargs
{}
291 def execute(self, data: Data) -> Data: 292 """ 293 Execute the wrapped transformation function on the provided data. 294 295 Calls the stored function with the input data as the first argument 296 and the stored keyword arguments. Returns the transformed data. 297 298 Parameters 299 ---------- 300 data : Data 301 The input data to transform. Must be a ``Data`` protocol-compatible 302 object (e.g., pandas DataFrame, PySpark DataFrame). The object 303 should be compatible with the wrapped function's expectations. 304 305 Returns 306 ------- 307 Data 308 The transformed data returned by the wrapped function. The exact 309 type depends on what the wrapped function returns, but it should 310 satisfy the ``Data`` protocol. 311 312 Raises 313 ------ 314 TypeError 315 If the function cannot accept the provided data type, or if 316 required keyword arguments are missing. 317 ValueError 318 If the function raises a ``ValueError`` due to invalid input 319 data or argument values (e.g., referencing a column that 320 doesn't exist). 321 KeyError 322 If the function attempts to access non-existent columns or keys. 323 Exception 324 Any other exception raised by the wrapped function propagates 325 unchanged. The exception type and message depend on the specific 326 function implementation. 327 328 See Also 329 -------- 330 ProcessingPipeline.run : Executes multiple steps in sequence. 331 332 Notes 333 ----- 334 **Execution Flow** 335 336 The execution is straightforward: 337 338 result = self.step(data, **self.kwargs) 339 340 The wrapped function receives the data as its first positional 341 argument and all stored keyword arguments are unpacked and passed. 342 343 **No Validation** 344 345 This method does not validate the input data or the function's 346 return value against the ``Data`` protocol. It trusts that: 347 348 1. The input satisfies the wrapped function's requirements 349 2. The function returns a valid ``Data`` object 350 351 Validation failures will surface as exceptions from the wrapped 352 function. 353 354 **Error Handling** 355 356 Exceptions from the wrapped function propagate directly to the 357 caller. When used within a :class:`ProcessingPipeline`, this means 358 the pipeline stops on the first error. Consider wrapping sensitive 359 operations with try-except in your step functions if you need 360 custom error handling. 361 362 Examples 363 -------- 364 **Basic execution:** 365 366 >>> import pandas as pd 367 >>> from adc_toolkit.processing.step import PipelineStep 368 >>> 369 >>> def double_values(data: pd.DataFrame, column: str) -> pd.DataFrame: 370 ... result = data.copy() 371 ... result[column] = result[column] * 2 372 ... return result 373 >>> step = PipelineStep(double_values, column="amount") 374 >>> df = pd.DataFrame({"amount": [10, 20, 30]}) 375 >>> result = step.execute(df) 376 >>> result["amount"].tolist() 377 [20, 40, 60] 378 379 **Handling errors:** 380 381 >>> def strict_transform(data: pd.DataFrame, required_col: str) -> pd.DataFrame: 382 ... if required_col not in data.columns: 383 ... raise ValueError(f"Column '{required_col}' not found") 384 ... return data 385 >>> step = PipelineStep(strict_transform, required_col="missing_column") 386 >>> try: 387 ... step.execute(df) 388 ... except ValueError as e: 389 ... print(f"Step failed: {e}") 390 Step failed: Column 'missing_column' not found 391 """ 392 result = self.step(data, **self.kwargs) 393 394 return result
Execute the wrapped transformation function on the provided data.
Calls the stored function with the input data as the first argument and the stored keyword arguments. Returns the transformed data.
Parameters
- data (Data):
The input data to transform. Must be a
Dataprotocol-compatible object (e.g., pandas DataFrame, PySpark DataFrame). The object should be compatible with the wrapped function's expectations.
Returns
- Data: The transformed data returned by the wrapped function. The exact
type depends on what the wrapped function returns, but it should
satisfy the
Dataprotocol.
Raises
- TypeError: If the function cannot accept the provided data type, or if required keyword arguments are missing.
- ValueError: If the function raises a
ValueErrordue to invalid input data or argument values (e.g., referencing a column that doesn't exist). - KeyError: If the function attempts to access non-existent columns or keys.
- Exception: Any other exception raised by the wrapped function propagates unchanged. The exception type and message depend on the specific function implementation.
See Also
ProcessingPipeline.run: Executes multiple steps in sequence.
Notes
Execution Flow
The execution is straightforward:
result = self.step(data, **self.kwargs)
The wrapped function receives the data as its first positional argument and all stored keyword arguments are unpacked and passed.
No Validation
This method does not validate the input data or the function's
return value against the Data protocol. It trusts that:
- The input satisfies the wrapped function's requirements
- The function returns a valid
Dataobject
Validation failures will surface as exceptions from the wrapped function.
Error Handling
Exceptions from the wrapped function propagate directly to the
caller. When used within a ProcessingPipeline, this means
the pipeline stops on the first error. Consider wrapping sensitive
operations with try-except in your step functions if you need
custom error handling.
Examples
Basic execution:
>>> import pandas as pd
>>> from adc_toolkit.processing.step import PipelineStep
>>>
>>> def double_values(data: pd.DataFrame, column: str) -> pd.DataFrame:
... result = data.copy()
... result[column] = result[column] * 2
... return result
>>> step = PipelineStep(double_values, column="amount")
>>> df = pd.DataFrame({"amount": [10, 20, 30]})
>>> result = step.execute(df)
>>> result["amount"].tolist()
[20, 40, 60]
Handling errors:
>>> def strict_transform(data: pd.DataFrame, required_col: str) -> pd.DataFrame:
... if required_col not in data.columns:
... raise ValueError(f"Column '{required_col}' not found")
... return data
>>> step = PipelineStep(strict_transform, required_col="missing_column")
>>> try:
... step.execute(df)
... except ValueError as e:
... print(f"Step failed: {e}")
Step failed: Column 'missing_column' not found
40class ProcessingPipeline: 41 """ 42 A pipeline for processing data through a sequence of transformation steps. 43 44 ``ProcessingPipeline`` orchestrates the execution of multiple processing 45 steps in a defined order. Each step transforms the data and passes the 46 result to the next step. The pipeline supports method chaining for fluent 47 construction and preserves the original input data through deep copying. 48 49 The pipeline is agnostic to the specific transformation functions used. 50 Any callable that accepts a ``Data`` object (e.g., pandas DataFrame) as 51 its first argument and returns a ``Data`` object can be added as a step. 52 This enables mixing: 53 54 - Prebuilt steps from :mod:`adc_toolkit.processing.steps` 55 - Built-in pandas or PySpark transformation methods 56 - Custom functions tailored to your specific needs 57 58 Attributes 59 ---------- 60 steps : list[PipelineStep] 61 The ordered list of pipeline steps to execute. Each step is a 62 :class:`PipelineStep` instance wrapping a transformation function 63 and its keyword arguments. 64 65 See Also 66 -------- 67 PipelineStep : The wrapper class for individual transformation functions. 68 adc_toolkit.processing.steps.pandas : Prebuilt pandas transformation steps. 69 adc_toolkit.data.abs.Data : Protocol defining compatible data objects. 70 71 Notes 72 ----- 73 **Immutability**: The :meth:`run` method creates a deep copy of the input 74 data before processing. This ensures the original data is never modified, 75 which is important for reproducibility and debugging. However, deep copying 76 can be expensive for very large datasets. 77 78 **Sequential Execution**: Steps execute in the order they were added. 79 The output of each step becomes the input to the next step. There is no 80 parallel execution or branching. 81 82 **Error Propagation**: If any step raises an exception, the pipeline 83 stops immediately and the exception propagates to the caller. Partial 84 results are not returned. 85 86 Examples 87 -------- 88 **Basic usage with prebuilt steps:** 89 90 >>> from adc_toolkit.processing import ProcessingPipeline 91 >>> from adc_toolkit.processing.steps.pandas import ( 92 ... remove_duplicates, 93 ... fill_missing_values, 94 ... make_columns_snake_case, 95 ... ) 96 >>> import pandas as pd 97 >>> 98 >>> # Create sample data 99 >>> df = pd.DataFrame( 100 ... { 101 ... "CustomerID": [1, 1, 2, 3], 102 ... "Value": [10.0, 10.0, None, 30.0], 103 ... } 104 ... ) 105 >>> 106 >>> # Build and run pipeline 107 >>> pipeline = ProcessingPipeline() 108 >>> pipeline.add(remove_duplicates, subset=["CustomerID"]) 109 >>> pipeline.add(fill_missing_values, method="mean", columns=["Value"]) 110 >>> pipeline.add(make_columns_snake_case) 111 >>> result = pipeline.run(df) 112 >>> result.columns.tolist() 113 ['customer_id', 'value'] 114 115 **Method chaining for fluent construction:** 116 117 >>> pipeline = ProcessingPipeline().add(remove_duplicates, subset=["id"]).add(fill_missing_values, method="median") 118 >>> len(pipeline) 119 2 120 121 **Using custom transformation functions:** 122 123 >>> def log_transform(data: pd.DataFrame, columns: list[str]) -> pd.DataFrame: 124 ... '''Apply log transformation to specified columns.''' 125 ... import numpy as np 126 ... 127 ... result = data.copy() 128 ... for col in columns: 129 ... result[col] = np.log1p(result[col]) 130 ... return result 131 >>> pipeline = ProcessingPipeline() 132 >>> pipeline.add(log_transform, columns=["price", "quantity"]) 133 >>> transformed = pipeline.run(sales_data) 134 135 **Using pandas functions directly:** 136 137 Many pandas functions can be used directly since they accept a DataFrame 138 as the first argument and return a DataFrame: 139 140 >>> import pandas as pd 141 >>> 142 >>> # Merge with a reference table 143 >>> pipeline = ProcessingPipeline() 144 >>> pipeline.add(pd.merge, right=lookup_df, how="left", on="category_id") 145 >>> enriched = pipeline.run(sales_df) 146 >>> 147 >>> # Query rows using pd.DataFrame.query (via eval) 148 >>> pipeline = ProcessingPipeline() 149 >>> pipeline.add(pd.eval, expr="amount * quantity", target=df) 150 151 **Inspecting pipeline structure:** 152 153 >>> pipeline = ProcessingPipeline() 154 >>> pipeline.add(remove_duplicates, subset=["id"]) 155 >>> pipeline.add(fill_missing_values, method="mean") 156 >>> print(pipeline) 157 remove_duplicates(subset=['id']) -> fill_missing_values(method=mean) 158 >>> len(pipeline) 159 2 160 """ 161 162 def __init__(self) -> None: 163 """ 164 Initialize an empty processing pipeline. 165 166 Creates a new pipeline with no steps. Steps can be added using the 167 :meth:`add` method. 168 169 Examples 170 -------- 171 >>> pipeline = ProcessingPipeline() 172 >>> len(pipeline) 173 0 174 """ 175 self.steps = list[PipelineStep]() 176 177 def __str__(self) -> str: 178 """ 179 Return a human-readable string representation of the pipeline. 180 181 The string shows each step's function name and parameters, joined 182 by arrows to indicate the flow of data through the pipeline. 183 184 Returns 185 ------- 186 str 187 A string representation showing all steps in order, formatted as 188 ``"step1(params) -> step2(params) -> ..."``. Returns an empty 189 string if the pipeline has no steps. 190 191 Examples 192 -------- 193 >>> from adc_toolkit.processing.steps.pandas import ( 194 ... remove_duplicates, 195 ... fill_missing_values, 196 ... ) 197 >>> pipeline = ProcessingPipeline() 198 >>> pipeline.add(remove_duplicates, subset=["id"]) 199 >>> pipeline.add(fill_missing_values, method="mean") 200 >>> print(pipeline) 201 remove_duplicates(subset=['id']) -> fill_missing_values(method=mean) 202 """ 203 return " -> ".join([str(step) for step in self.steps]) 204 205 def __len__(self) -> int: 206 """ 207 Return the number of steps in the pipeline. 208 209 Returns 210 ------- 211 int 212 The count of transformation steps currently in the pipeline. 213 214 Examples 215 -------- 216 >>> pipeline = ProcessingPipeline() 217 >>> len(pipeline) 218 0 219 >>> pipeline.add(lambda df: df) 220 <ProcessingPipeline object> 221 >>> len(pipeline) 222 1 223 """ 224 return len(self.steps) 225 226 def add( 227 self, 228 step: Callable[..., Data], 229 **kwargs: Any, 230 ) -> "ProcessingPipeline": 231 """ 232 Add a transformation step to the pipeline. 233 234 Appends a new step to the end of the pipeline. The step consists of 235 a callable (function) and any keyword arguments to pass to it during 236 execution. The callable must follow the step contract: accept a 237 ``Data`` object as the first positional argument and return a 238 ``Data`` object. 239 240 This method returns ``self`` to enable method chaining, allowing 241 multiple steps to be added in a fluent style. 242 243 Parameters 244 ---------- 245 step : Callable[..., Data] 246 The transformation function to execute. Must accept a ``Data`` 247 object (e.g., pandas DataFrame, PySpark DataFrame) as its first 248 positional argument and return a ``Data`` object. Can be: 249 250 - A prebuilt step from :mod:`adc_toolkit.processing.steps` 251 - A pandas/PySpark function directly (e.g., ``pd.merge``) 252 - A lambda function for simple inline transformations 253 - A custom function you define 254 255 **kwargs : Any 256 Keyword arguments to pass to the step function during execution. 257 These are stored with the step and applied when :meth:`run` is 258 called. The arguments should match the function's signature 259 (excluding the first ``data`` parameter). 260 261 Returns 262 ------- 263 ProcessingPipeline 264 Returns ``self`` to enable method chaining. 265 266 See Also 267 -------- 268 run : Execute the pipeline on data. 269 PipelineStep : The wrapper class for transformation functions. 270 271 Notes 272 ----- 273 **The Step Contract** 274 275 Any function can be used as a step if it follows this signature: 276 277 def my_step(data: Data, param1: T1, param2: T2, ...) -> Data: 278 # Transform the data 279 return transformed_data 280 281 The ``Data`` protocol requires only ``columns`` and ``dtypes`` 282 properties. pandas DataFrames and PySpark DataFrames satisfy this 283 naturally, so you can use them directly without any wrappers. 284 285 **Keyword Arguments Only** 286 287 Only keyword arguments (not positional) can be passed to the step 288 function via the ``**kwargs`` parameter. If your function requires 289 positional arguments beyond the ``data`` parameter, consider using 290 ``functools.partial`` or a lambda wrapper. 291 292 Examples 293 -------- 294 **Using prebuilt steps:** 295 296 >>> from adc_toolkit.processing.steps.pandas import remove_duplicates 297 >>> pipeline = ProcessingPipeline() 298 >>> pipeline.add(remove_duplicates, subset=["customer_id"], keep="first") 299 <ProcessingPipeline object> 300 301 **Using a custom function:** 302 303 >>> def scale_column(data, column: str, factor: float) -> pd.DataFrame: 304 ... result = data.copy() 305 ... result[column] = result[column] * factor 306 ... return result 307 >>> pipeline.add(scale_column, column="price", factor=1.1) 308 <ProcessingPipeline object> 309 310 **Method chaining:** 311 312 >>> from adc_toolkit.processing.steps.pandas import ( 313 ... remove_duplicates, 314 ... fill_missing_values, 315 ... select_columns, 316 ... ) 317 >>> pipeline = ( 318 ... ProcessingPipeline() 319 ... .add(remove_duplicates) 320 ... .add(fill_missing_values, method="mean") 321 ... .add(select_columns, columns=["id", "name", "value"]) 322 ... ) 323 >>> len(pipeline) 324 3 325 326 **Using pandas functions directly:** 327 328 >>> import pandas as pd 329 >>> pipeline = ProcessingPipeline() 330 >>> pipeline.add(pd.merge, right=reference_df, how="left", on="id") 331 <ProcessingPipeline object> 332 333 **Using a lambda for simple transformations:** 334 335 >>> pipeline = ProcessingPipeline() 336 >>> pipeline.add(lambda df: df.reset_index(drop=True)) 337 <ProcessingPipeline object> 338 """ 339 self.steps.append(PipelineStep(step, **kwargs)) 340 341 return self 342 343 def run(self, data: Data) -> Data: 344 """ 345 Execute the pipeline on the provided data. 346 347 Runs all transformation steps in sequence on a deep copy of the 348 input data. Each step receives the output of the previous step as 349 its input. The original data is never modified. 350 351 Parameters 352 ---------- 353 data : Data 354 The input data to process. Must be a ``Data`` protocol-compatible 355 object (e.g., pandas DataFrame, PySpark DataFrame). The object 356 must support deep copying via ``copy.deepcopy``. 357 358 Returns 359 ------- 360 Data 361 The transformed data after all pipeline steps have been applied. 362 Returns a new object; the original ``data`` is unchanged. 363 364 Raises 365 ------ 366 TypeError 367 If the input data cannot be deep copied. 368 Exception 369 Any exception raised by a step function propagates directly. 370 The exception message indicates which step failed. 371 372 See Also 373 -------- 374 add : Add steps to the pipeline before running. 375 376 Notes 377 ----- 378 **Immutability** 379 380 The pipeline creates a deep copy of the input data before processing. 381 This ensures: 382 383 - The original data is preserved for comparison or debugging 384 - Multiple runs with the same input produce consistent results 385 - Side effects from step functions don't affect the original 386 387 For very large datasets, this copying may have performance 388 implications. Consider: 389 390 - Using PySpark for distributed processing of large data 391 - Processing data in chunks if memory is constrained 392 - Implementing steps that operate in-place if you're certain 393 about data ownership 394 395 **Sequential Execution** 396 397 Steps execute strictly in order. The data flow is: 398 399 input -> step1 -> step2 -> ... -> stepN -> output 400 401 There is no parallel execution. If a step fails, subsequent steps 402 do not run and the exception propagates immediately. 403 404 **Empty Pipeline** 405 406 Running an empty pipeline (no steps added) returns a deep copy of 407 the input data without any transformations. 408 409 Examples 410 -------- 411 **Basic execution:** 412 413 >>> import pandas as pd 414 >>> from adc_toolkit.processing import ProcessingPipeline 415 >>> from adc_toolkit.processing.steps.pandas import remove_duplicates 416 >>> 417 >>> df = pd.DataFrame({"id": [1, 1, 2], "value": [10, 10, 20]}) 418 >>> pipeline = ProcessingPipeline() 419 >>> pipeline.add(remove_duplicates, subset=["id"]) 420 >>> result = pipeline.run(df) 421 >>> len(result) 422 2 423 >>> len(df) # Original unchanged 424 3 425 426 **Running the same pipeline multiple times:** 427 428 >>> result1 = pipeline.run(data_batch_1) 429 >>> result2 = pipeline.run(data_batch_2) 430 >>> result3 = pipeline.run(data_batch_3) 431 432 **Empty pipeline returns a copy:** 433 434 >>> empty_pipeline = ProcessingPipeline() 435 >>> result = empty_pipeline.run(df) 436 >>> result is df 437 False 438 >>> result.equals(df) 439 True 440 """ 441 result = copy.deepcopy(data) 442 443 for step in self.steps: 444 result = step.execute(result) 445 446 return result
A pipeline for processing data through a sequence of transformation steps.
ProcessingPipeline orchestrates the execution of multiple processing
steps in a defined order. Each step transforms the data and passes the
result to the next step. The pipeline supports method chaining for fluent
construction and preserves the original input data through deep copying.
The pipeline is agnostic to the specific transformation functions used.
Any callable that accepts a Data object (e.g., pandas DataFrame) as
its first argument and returns a Data object can be added as a step.
This enables mixing:
- Prebuilt steps from
adc_toolkit.processing.steps - Built-in pandas or PySpark transformation methods
- Custom functions tailored to your specific needs
Attributes
- steps (list[PipelineStep]):
The ordered list of pipeline steps to execute. Each step is a
PipelineStepinstance wrapping a transformation function and its keyword arguments.
See Also
PipelineStep: The wrapper class for individual transformation functions.
adc_toolkit.processing.steps.pandas: Prebuilt pandas transformation steps.
adc_toolkit.data.abs.Data: Protocol defining compatible data objects.
Notes
Immutability: The run() method creates a deep copy of the input
data before processing. This ensures the original data is never modified,
which is important for reproducibility and debugging. However, deep copying
can be expensive for very large datasets.
Sequential Execution: Steps execute in the order they were added. The output of each step becomes the input to the next step. There is no parallel execution or branching.
Error Propagation: If any step raises an exception, the pipeline stops immediately and the exception propagates to the caller. Partial results are not returned.
Examples
Basic usage with prebuilt steps:
>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import (
... remove_duplicates,
... fill_missing_values,
... make_columns_snake_case,
... )
>>> import pandas as pd
>>>
>>> # Create sample data
>>> df = pd.DataFrame(
... {
... "CustomerID": [1, 1, 2, 3],
... "Value": [10.0, 10.0, None, 30.0],
... }
... )
>>>
>>> # Build and run pipeline
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["CustomerID"])
>>> pipeline.add(fill_missing_values, method="mean", columns=["Value"])
>>> pipeline.add(make_columns_snake_case)
>>> result = pipeline.run(df)
>>> result.columns.tolist()
['customer_id', 'value']
Method chaining for fluent construction:
>>> pipeline = ProcessingPipeline().add(remove_duplicates, subset=["id"]).add(fill_missing_values, method="median")
>>> len(pipeline)
2
Using custom transformation functions:
>>> def log_transform(data: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
... '''Apply log transformation to specified columns.'''
... import numpy as np
...
... result = data.copy()
... for col in columns:
... result[col] = np.log1p(result[col])
... return result
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(log_transform, columns=["price", "quantity"])
>>> transformed = pipeline.run(sales_data)
Using pandas functions directly:
Many pandas functions can be used directly since they accept a DataFrame as the first argument and return a DataFrame:
>>> import pandas as pd
>>>
>>> # Merge with a reference table
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.merge, right=lookup_df, how="left", on="category_id")
>>> enriched = pipeline.run(sales_df)
>>>
>>> # Query rows using pd.DataFrame.query (via eval)
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.eval, expr="amount * quantity", target=df)
Inspecting pipeline structure:
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["id"])
>>> pipeline.add(fill_missing_values, method="mean")
>>> print(pipeline)
remove_duplicates(subset=['id']) -> fill_missing_values(method=mean)
>>> len(pipeline)
2
162 def __init__(self) -> None: 163 """ 164 Initialize an empty processing pipeline. 165 166 Creates a new pipeline with no steps. Steps can be added using the 167 :meth:`add` method. 168 169 Examples 170 -------- 171 >>> pipeline = ProcessingPipeline() 172 >>> len(pipeline) 173 0 174 """ 175 self.steps = list[PipelineStep]()
Initialize an empty processing pipeline.
Creates a new pipeline with no steps. Steps can be added using the
add() method.
Examples
>>> pipeline = ProcessingPipeline()
>>> len(pipeline)
0
226 def add( 227 self, 228 step: Callable[..., Data], 229 **kwargs: Any, 230 ) -> "ProcessingPipeline": 231 """ 232 Add a transformation step to the pipeline. 233 234 Appends a new step to the end of the pipeline. The step consists of 235 a callable (function) and any keyword arguments to pass to it during 236 execution. The callable must follow the step contract: accept a 237 ``Data`` object as the first positional argument and return a 238 ``Data`` object. 239 240 This method returns ``self`` to enable method chaining, allowing 241 multiple steps to be added in a fluent style. 242 243 Parameters 244 ---------- 245 step : Callable[..., Data] 246 The transformation function to execute. Must accept a ``Data`` 247 object (e.g., pandas DataFrame, PySpark DataFrame) as its first 248 positional argument and return a ``Data`` object. Can be: 249 250 - A prebuilt step from :mod:`adc_toolkit.processing.steps` 251 - A pandas/PySpark function directly (e.g., ``pd.merge``) 252 - A lambda function for simple inline transformations 253 - A custom function you define 254 255 **kwargs : Any 256 Keyword arguments to pass to the step function during execution. 257 These are stored with the step and applied when :meth:`run` is 258 called. The arguments should match the function's signature 259 (excluding the first ``data`` parameter). 260 261 Returns 262 ------- 263 ProcessingPipeline 264 Returns ``self`` to enable method chaining. 265 266 See Also 267 -------- 268 run : Execute the pipeline on data. 269 PipelineStep : The wrapper class for transformation functions. 270 271 Notes 272 ----- 273 **The Step Contract** 274 275 Any function can be used as a step if it follows this signature: 276 277 def my_step(data: Data, param1: T1, param2: T2, ...) -> Data: 278 # Transform the data 279 return transformed_data 280 281 The ``Data`` protocol requires only ``columns`` and ``dtypes`` 282 properties. pandas DataFrames and PySpark DataFrames satisfy this 283 naturally, so you can use them directly without any wrappers. 284 285 **Keyword Arguments Only** 286 287 Only keyword arguments (not positional) can be passed to the step 288 function via the ``**kwargs`` parameter. If your function requires 289 positional arguments beyond the ``data`` parameter, consider using 290 ``functools.partial`` or a lambda wrapper. 291 292 Examples 293 -------- 294 **Using prebuilt steps:** 295 296 >>> from adc_toolkit.processing.steps.pandas import remove_duplicates 297 >>> pipeline = ProcessingPipeline() 298 >>> pipeline.add(remove_duplicates, subset=["customer_id"], keep="first") 299 <ProcessingPipeline object> 300 301 **Using a custom function:** 302 303 >>> def scale_column(data, column: str, factor: float) -> pd.DataFrame: 304 ... result = data.copy() 305 ... result[column] = result[column] * factor 306 ... return result 307 >>> pipeline.add(scale_column, column="price", factor=1.1) 308 <ProcessingPipeline object> 309 310 **Method chaining:** 311 312 >>> from adc_toolkit.processing.steps.pandas import ( 313 ... remove_duplicates, 314 ... fill_missing_values, 315 ... select_columns, 316 ... ) 317 >>> pipeline = ( 318 ... ProcessingPipeline() 319 ... .add(remove_duplicates) 320 ... .add(fill_missing_values, method="mean") 321 ... .add(select_columns, columns=["id", "name", "value"]) 322 ... ) 323 >>> len(pipeline) 324 3 325 326 **Using pandas functions directly:** 327 328 >>> import pandas as pd 329 >>> pipeline = ProcessingPipeline() 330 >>> pipeline.add(pd.merge, right=reference_df, how="left", on="id") 331 <ProcessingPipeline object> 332 333 **Using a lambda for simple transformations:** 334 335 >>> pipeline = ProcessingPipeline() 336 >>> pipeline.add(lambda df: df.reset_index(drop=True)) 337 <ProcessingPipeline object> 338 """ 339 self.steps.append(PipelineStep(step, **kwargs)) 340 341 return self
Add a transformation step to the pipeline.
Appends a new step to the end of the pipeline. The step consists of
a callable (function) and any keyword arguments to pass to it during
execution. The callable must follow the step contract: accept a
Data object as the first positional argument and return a
Data object.
This method returns self to enable method chaining, allowing
multiple steps to be added in a fluent style.
Parameters
step (Callable[..., Data]): The transformation function to execute. Must accept a
Dataobject (e.g., pandas DataFrame, PySpark DataFrame) as its first positional argument and return aDataobject. Can be:- A prebuilt step from
adc_toolkit.processing.steps - A pandas/PySpark function directly (e.g.,
pd.merge) - A lambda function for simple inline transformations
- A custom function you define
- A prebuilt step from
- **kwargs (Any):
Keyword arguments to pass to the step function during execution.
These are stored with the step and applied when
run()is called. The arguments should match the function's signature (excluding the firstdataparameter).
Returns
- ProcessingPipeline: Returns
selfto enable method chaining.
See Also
run: Execute the pipeline on data.
PipelineStep: The wrapper class for transformation functions.
Notes
The Step Contract
Any function can be used as a step if it follows this signature:
def my_step(data: Data, param1: T1, param2: T2, ...) -> Data:
# Transform the data
return transformed_data
The Data protocol requires only columns and dtypes
properties. pandas DataFrames and PySpark DataFrames satisfy this
naturally, so you can use them directly without any wrappers.
Keyword Arguments Only
Only keyword arguments (not positional) can be passed to the step
function via the **kwargs parameter. If your function requires
positional arguments beyond the data parameter, consider using
functools.partial or a lambda wrapper.
Examples
Using prebuilt steps:
>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["customer_id"], keep="first")
<ProcessingPipeline object>
Using a custom function:
>>> def scale_column(data, column: str, factor: float) -> pd.DataFrame:
... result = data.copy()
... result[column] = result[column] * factor
... return result
>>> pipeline.add(scale_column, column="price", factor=1.1)
<ProcessingPipeline object>
Method chaining:
>>> from adc_toolkit.processing.steps.pandas import (
... remove_duplicates,
... fill_missing_values,
... select_columns,
... )
>>> pipeline = (
... ProcessingPipeline()
... .add(remove_duplicates)
... .add(fill_missing_values, method="mean")
... .add(select_columns, columns=["id", "name", "value"])
... )
>>> len(pipeline)
3
Using pandas functions directly:
>>> import pandas as pd
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
<ProcessingPipeline object>
Using a lambda for simple transformations:
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(lambda df: df.reset_index(drop=True))
<ProcessingPipeline object>
343 def run(self, data: Data) -> Data: 344 """ 345 Execute the pipeline on the provided data. 346 347 Runs all transformation steps in sequence on a deep copy of the 348 input data. Each step receives the output of the previous step as 349 its input. The original data is never modified. 350 351 Parameters 352 ---------- 353 data : Data 354 The input data to process. Must be a ``Data`` protocol-compatible 355 object (e.g., pandas DataFrame, PySpark DataFrame). The object 356 must support deep copying via ``copy.deepcopy``. 357 358 Returns 359 ------- 360 Data 361 The transformed data after all pipeline steps have been applied. 362 Returns a new object; the original ``data`` is unchanged. 363 364 Raises 365 ------ 366 TypeError 367 If the input data cannot be deep copied. 368 Exception 369 Any exception raised by a step function propagates directly. 370 The exception message indicates which step failed. 371 372 See Also 373 -------- 374 add : Add steps to the pipeline before running. 375 376 Notes 377 ----- 378 **Immutability** 379 380 The pipeline creates a deep copy of the input data before processing. 381 This ensures: 382 383 - The original data is preserved for comparison or debugging 384 - Multiple runs with the same input produce consistent results 385 - Side effects from step functions don't affect the original 386 387 For very large datasets, this copying may have performance 388 implications. Consider: 389 390 - Using PySpark for distributed processing of large data 391 - Processing data in chunks if memory is constrained 392 - Implementing steps that operate in-place if you're certain 393 about data ownership 394 395 **Sequential Execution** 396 397 Steps execute strictly in order. The data flow is: 398 399 input -> step1 -> step2 -> ... -> stepN -> output 400 401 There is no parallel execution. If a step fails, subsequent steps 402 do not run and the exception propagates immediately. 403 404 **Empty Pipeline** 405 406 Running an empty pipeline (no steps added) returns a deep copy of 407 the input data without any transformations. 408 409 Examples 410 -------- 411 **Basic execution:** 412 413 >>> import pandas as pd 414 >>> from adc_toolkit.processing import ProcessingPipeline 415 >>> from adc_toolkit.processing.steps.pandas import remove_duplicates 416 >>> 417 >>> df = pd.DataFrame({"id": [1, 1, 2], "value": [10, 10, 20]}) 418 >>> pipeline = ProcessingPipeline() 419 >>> pipeline.add(remove_duplicates, subset=["id"]) 420 >>> result = pipeline.run(df) 421 >>> len(result) 422 2 423 >>> len(df) # Original unchanged 424 3 425 426 **Running the same pipeline multiple times:** 427 428 >>> result1 = pipeline.run(data_batch_1) 429 >>> result2 = pipeline.run(data_batch_2) 430 >>> result3 = pipeline.run(data_batch_3) 431 432 **Empty pipeline returns a copy:** 433 434 >>> empty_pipeline = ProcessingPipeline() 435 >>> result = empty_pipeline.run(df) 436 >>> result is df 437 False 438 >>> result.equals(df) 439 True 440 """ 441 result = copy.deepcopy(data) 442 443 for step in self.steps: 444 result = step.execute(result) 445 446 return result
Execute the pipeline on the provided data.
Runs all transformation steps in sequence on a deep copy of the input data. Each step receives the output of the previous step as its input. The original data is never modified.
Parameters
- data (Data):
The input data to process. Must be a
Dataprotocol-compatible object (e.g., pandas DataFrame, PySpark DataFrame). The object must support deep copying viacopy.deepcopy.
Returns
- Data: The transformed data after all pipeline steps have been applied.
Returns a new object; the original
datais unchanged.
Raises
- TypeError: If the input data cannot be deep copied.
- Exception: Any exception raised by a step function propagates directly. The exception message indicates which step failed.
See Also
add: Add steps to the pipeline before running.
Notes
Immutability
The pipeline creates a deep copy of the input data before processing. This ensures:
- The original data is preserved for comparison or debugging
- Multiple runs with the same input produce consistent results
- Side effects from step functions don't affect the original
For very large datasets, this copying may have performance implications. Consider:
- Using PySpark for distributed processing of large data
- Processing data in chunks if memory is constrained
- Implementing steps that operate in-place if you're certain about data ownership
Sequential Execution
Steps execute strictly in order. The data flow is:
input -> step1 -> step2 -> ... -> stepN -> output
There is no parallel execution. If a step fails, subsequent steps do not run and the exception propagates immediately.
Empty Pipeline
Running an empty pipeline (no steps added) returns a deep copy of the input data without any transformations.
Examples
Basic execution:
>>> import pandas as pd
>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
>>>
>>> df = pd.DataFrame({"id": [1, 1, 2], "value": [10, 10, 20]})
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["id"])
>>> result = pipeline.run(df)
>>> len(result)
2
>>> len(df) # Original unchanged
3
Running the same pipeline multiple times:
>>> result1 = pipeline.run(data_batch_1)
>>> result2 = pipeline.run(data_batch_2)
>>> result3 = pipeline.run(data_batch_3)
Empty pipeline returns a copy:
>>> empty_pipeline = ProcessingPipeline()
>>> result = empty_pipeline.run(df)
>>> result is df
False
>>> result.equals(df)
True