adc_toolkit.processing

Data processing pipeline framework for composable, chainable transformations.

This module provides a flexible pipeline framework for processing data through a sequence of transformation steps. The framework is designed to be agnostic to the underlying data type, working with any object that satisfies the ~adc_toolkit.data.abs.Data protocol (e.g., pandas DataFrames, PySpark DataFrames).

Key Components

ProcessingPipeline Orchestrates the execution of multiple processing steps in sequence. Supports method chaining for fluent pipeline construction. PipelineStep Wraps individual transformation functions with their parameters. Handles execution and provides debugging information.

The Function Contract

The framework's flexibility comes from a simple contract: any function that accepts a Data object as its first positional argument and returns a Data object can be used as a pipeline step. This means you can use:

  1. Prebuilt step functions from adc_toolkit.processing.steps
  2. pandas/PySpark functions directly (e.g., pd.merge, pd.concat)
  3. Lambda functions for simple inline transformations
  4. Custom functions you write for your specific needs

The function signature should follow this pattern:

def my_step(data: Data, param1: type1, param2: type2, ...) -> Data:
    # Transform data
    return transformed_data

Where Data is any object with columns and dtypes properties (pandas DataFrames and PySpark DataFrames satisfy this naturally).

Examples

Using prebuilt step functions:

>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import remove_duplicates, fill_missing_values
>>>
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["id"])
>>> pipeline.add(fill_missing_values, method="mean", columns=["value"])
>>> result = pipeline.run(raw_data)

Using pandas functions directly:

Many pandas functions accept a DataFrame as the first argument and return a DataFrame, making them directly usable as pipeline steps:

>>> import pandas as pd
>>>
>>> # Merge with another DataFrame
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
>>> result = pipeline.run(main_df)
>>>
>>> # Concatenate DataFrames
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.concat, objs=[df2, df3], ignore_index=True)
>>> result = pipeline.run(df1)  # df1 is passed as first positional arg

Using custom transformation functions:

>>> def normalize_by_group(
...     data: pd.DataFrame,
...     value_col: str,
...     group_col: str,
... ) -> pd.DataFrame:
...     '''Normalize values within each group to [0, 1] range.'''
...     result = data.copy()
...     for group in data[group_col].unique():
...         mask = data[group_col] == group
...         values = data.loc[mask, value_col]
...         min_val, max_val = values.min(), values.max()
...         if max_val > min_val:
...             result.loc[mask, value_col] = (values - min_val) / (max_val - min_val)
...     return result
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(normalize_by_group, value_col="price", group_col="category")
>>> result = pipeline.run(sales_data)

Using lambda functions for simple transformations:

>>> pipeline = ProcessingPipeline()
>>> pipeline.add(lambda df: df.dropna())
>>> pipeline.add(lambda df: df.reset_index(drop=True))
>>> result = pipeline.run(messy_data)

Method chaining for fluent construction:

>>> pipeline = (
...     ProcessingPipeline()
...     .add(remove_duplicates)
...     .add(fill_missing_values, method="median")
...     .add(lambda df: df.reset_index(drop=True))
... )
>>> result = pipeline.run(data)
See Also

adc_toolkit.processing.pipeline.ProcessingPipeline: The pipeline orchestrator.
adc_toolkit.processing.step.PipelineStep: The step wrapper class.
adc_toolkit.processing.steps: Prebuilt transformation functions.
adc_toolkit.data.abs.Data: The protocol defining compatible data objects.

Notes

The pipeline creates a deep copy of input data before processing, ensuring the original data remains unmodified. This is important for reproducibility but may have performance implications for very large datasets.

For production workloads with large data, consider:

  • Using PySpark DataFrames which handle distributed processing
  • Processing data in chunks if memory is a constraint
  • Using in-place operations in custom step functions when appropriate
  1"""
  2Data processing pipeline framework for composable, chainable transformations.
  3
  4This module provides a flexible pipeline framework for processing data through
  5a sequence of transformation steps. The framework is designed to be agnostic
  6to the underlying data type, working with any object that satisfies the
  7:class:`~adc_toolkit.data.abs.Data` protocol (e.g., pandas DataFrames,
  8PySpark DataFrames).
  9
 10Key Components
 11--------------
 12ProcessingPipeline
 13    Orchestrates the execution of multiple processing steps in sequence.
 14    Supports method chaining for fluent pipeline construction.
 15PipelineStep
 16    Wraps individual transformation functions with their parameters.
 17    Handles execution and provides debugging information.
 18
 19The Function Contract
 20---------------------
 21The framework's flexibility comes from a simple contract: any function that
 22accepts a ``Data`` object as its first positional argument and returns a
 23``Data`` object can be used as a pipeline step. This means you can use:
 24
 251. **Prebuilt step functions** from :mod:`adc_toolkit.processing.steps`
 262. **pandas/PySpark functions directly** (e.g., ``pd.merge``, ``pd.concat``)
 273. **Lambda functions** for simple inline transformations
 284. **Custom functions** you write for your specific needs
 29
 30The function signature should follow this pattern:
 31
 32    def my_step(data: Data, param1: type1, param2: type2, ...) -> Data:
 33        # Transform data
 34        return transformed_data
 35
 36Where ``Data`` is any object with ``columns`` and ``dtypes`` properties
 37(pandas DataFrames and PySpark DataFrames satisfy this naturally).
 38
 39Examples
 40--------
 41**Using prebuilt step functions:**
 42
 43>>> from adc_toolkit.processing import ProcessingPipeline
 44>>> from adc_toolkit.processing.steps.pandas import remove_duplicates, fill_missing_values
 45>>>
 46>>> pipeline = ProcessingPipeline()
 47>>> pipeline.add(remove_duplicates, subset=["id"])
 48>>> pipeline.add(fill_missing_values, method="mean", columns=["value"])
 49>>> result = pipeline.run(raw_data)
 50
 51**Using pandas functions directly:**
 52
 53Many pandas functions accept a DataFrame as the first argument and return
 54a DataFrame, making them directly usable as pipeline steps:
 55
 56>>> import pandas as pd
 57>>>
 58>>> # Merge with another DataFrame
 59>>> pipeline = ProcessingPipeline()
 60>>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
 61>>> result = pipeline.run(main_df)
 62>>>
 63>>> # Concatenate DataFrames
 64>>> pipeline = ProcessingPipeline()
 65>>> pipeline.add(pd.concat, objs=[df2, df3], ignore_index=True)
 66>>> result = pipeline.run(df1)  # df1 is passed as first positional arg
 67
 68**Using custom transformation functions:**
 69
 70>>> def normalize_by_group(
 71...     data: pd.DataFrame,
 72...     value_col: str,
 73...     group_col: str,
 74... ) -> pd.DataFrame:
 75...     '''Normalize values within each group to [0, 1] range.'''
 76...     result = data.copy()
 77...     for group in data[group_col].unique():
 78...         mask = data[group_col] == group
 79...         values = data.loc[mask, value_col]
 80...         min_val, max_val = values.min(), values.max()
 81...         if max_val > min_val:
 82...             result.loc[mask, value_col] = (values - min_val) / (max_val - min_val)
 83...     return result
 84>>> pipeline = ProcessingPipeline()
 85>>> pipeline.add(normalize_by_group, value_col="price", group_col="category")
 86>>> result = pipeline.run(sales_data)
 87
 88**Using lambda functions for simple transformations:**
 89
 90>>> pipeline = ProcessingPipeline()
 91>>> pipeline.add(lambda df: df.dropna())
 92>>> pipeline.add(lambda df: df.reset_index(drop=True))
 93>>> result = pipeline.run(messy_data)
 94
 95**Method chaining for fluent construction:**
 96
 97>>> pipeline = (
 98...     ProcessingPipeline()
 99...     .add(remove_duplicates)
100...     .add(fill_missing_values, method="median")
101...     .add(lambda df: df.reset_index(drop=True))
102... )
103>>> result = pipeline.run(data)
104
105See Also
106--------
107adc_toolkit.processing.pipeline.ProcessingPipeline : The pipeline orchestrator.
108adc_toolkit.processing.step.PipelineStep : The step wrapper class.
109adc_toolkit.processing.steps : Prebuilt transformation functions.
110adc_toolkit.data.abs.Data : The protocol defining compatible data objects.
111
112Notes
113-----
114The pipeline creates a deep copy of input data before processing, ensuring
115the original data remains unmodified. This is important for reproducibility
116but may have performance implications for very large datasets.
117
118For production workloads with large data, consider:
119- Using PySpark DataFrames which handle distributed processing
120- Processing data in chunks if memory is a constraint
121- Using in-place operations in custom step functions when appropriate
122"""
123
124from . import steps
125from .pipeline import ProcessingPipeline
126from .step import PipelineStep
127
128
129__all__ = ["PipelineStep", "ProcessingPipeline", "steps"]
class PipelineStep:
 76class PipelineStep:
 77    """
 78    A wrapper for transformation functions in a data processing pipeline.
 79
 80    ``PipelineStep`` encapsulates a transformation function along with its
 81    keyword arguments, creating a reusable, inspectable unit of data
 82    transformation. It handles the invocation of the wrapped function with
 83    the stored arguments during pipeline execution.
 84
 85    This class is typically not instantiated directly by users. Instead,
 86    steps are created implicitly when calling
 87    :meth:`ProcessingPipeline.add() <adc_toolkit.processing.ProcessingPipeline.add>`.
 88    However, understanding ``PipelineStep`` is useful for debugging and
 89    creating custom pipeline behaviors.
 90
 91    Parameters
 92    ----------
 93    step : Callable[..., Data]
 94        The transformation function to wrap. Must accept a ``Data`` object
 95        as its first positional argument and return a ``Data`` object.
 96    **kwargs : Any
 97        Keyword arguments to pass to the function during execution. These
 98        are stored and applied every time :meth:`execute` is called.
 99
100    Attributes
101    ----------
102    step : Callable[..., Data]
103        The wrapped transformation function.
104    kwargs : dict[str, Any]
105        Dictionary of keyword arguments to pass to the function.
106
107    See Also
108    --------
109    ProcessingPipeline : Uses ``PipelineStep`` to orchestrate transformations.
110    adc_toolkit.processing.steps.pandas : Prebuilt step functions for pandas.
111
112    Notes
113    -----
114    **The Step Function Contract**
115
116    Any function can be wrapped in a ``PipelineStep`` if it follows this
117    pattern:
118
119        def transformation(data: Data, **kwargs) -> Data:
120            # Perform transformation
121            return transformed_data
122
123    Where:
124
125    - ``Data`` is any object satisfying the ``Data`` protocol (has ``columns``
126      and ``dtypes`` properties)
127    - ``**kwargs`` represents any additional parameters your function needs
128    - The return value must also satisfy the ``Data`` protocol
129
130    **Flexibility**
131
132    This design enables wrapping:
133
134    - **Prebuilt functions** from :mod:`adc_toolkit.processing.steps`
135    - **pandas built-in methods** wrapped as functions
136    - **PySpark transformations** wrapped as functions
137    - **Custom domain-specific** transformation logic
138
139    **Debugging**
140
141    The :meth:`__str__` method provides a human-readable representation
142    showing the function name and its bound arguments, useful for
143    debugging and logging pipeline structure.
144
145    Examples
146    --------
147    **Creating a step with a custom function:**
148
149    >>> import pandas as pd
150    >>> from adc_toolkit.processing.step import PipelineStep
151    >>>
152    >>> def normalize(data: pd.DataFrame, column: str) -> pd.DataFrame:
153    ...     result = data.copy()
154    ...     col_data = result[column]
155    ...     result[column] = (col_data - col_data.min()) / (col_data.max() - col_data.min())
156    ...     return result
157    >>> step = PipelineStep(normalize, column="temperature")
158    >>> print(step)
159    normalize(column=temperature)
160
161    **Creating a step with a prebuilt function:**
162
163    >>> from adc_toolkit.processing.steps.pandas import fill_missing_values
164    >>>
165    >>> step = PipelineStep(fill_missing_values, method="mean", columns=["value"])
166    >>> print(step)
167    fill_missing_values(method=mean, columns=['value'])
168
169    **Executing a step:**
170
171    >>> df = pd.DataFrame({"temperature": [0, 50, 100]})
172    >>> step = PipelineStep(normalize, column="temperature")
173    >>> result = step.execute(df)
174    >>> result["temperature"].tolist()
175    [0.0, 0.5, 1.0]
176
177    **Wrapping a pandas operation:**
178
179    >>> def drop_na(data: pd.DataFrame, subset: list[str] | None = None) -> pd.DataFrame:
180    ...     return data.dropna(subset=subset)
181    >>> step = PipelineStep(drop_na, subset=["critical_column"])
182    >>> step.execute(df_with_nulls)
183    """
184
185    def __init__(
186        self,
187        step: Callable[..., Data],
188        **kwargs: Any,
189    ) -> None:
190        """
191        Initialize a pipeline step with a function and its arguments.
192
193        Creates a new step by storing the transformation function and any
194        keyword arguments that should be passed to it during execution.
195
196        Parameters
197        ----------
198        step : Callable[..., Data]
199            The transformation function to wrap. This function must:
200
201            - Accept a ``Data`` object as its first positional argument
202            - Return a ``Data`` object
203            - Accept any additional parameters as keyword arguments
204
205            The function can be a prebuilt step, a pandas/PySpark method
206            wrapped as a function, or a custom transformation.
207
208        **kwargs : Any
209            Keyword arguments to pass to the step function when
210            :meth:`execute` is called. These should match the function's
211            parameter names (excluding the first ``data`` parameter).
212
213        See Also
214        --------
215        execute : Run the step on data.
216
217        Notes
218        -----
219        The keyword arguments are stored as-is in the ``kwargs`` attribute.
220        They are not validated at initialization time; validation happens
221        when the function is called during :meth:`execute`. This allows
222        for flexible argument passing but means errors in argument names
223        or types will only surface at execution time.
224
225        Examples
226        --------
227        **Basic initialization:**
228
229        >>> def add_suffix(data, column: str, suffix: str) -> pd.DataFrame:
230        ...     result = data.copy()
231        ...     result[column] = result[column].astype(str) + suffix
232        ...     return result
233        >>> step = PipelineStep(add_suffix, column="name", suffix="_processed")
234        >>> step.kwargs
235        {'column': 'name', 'suffix': '_processed'}
236
237        **With no additional arguments:**
238
239        >>> def identity(data):
240        ...     return data.copy()
241        >>> step = PipelineStep(identity)
242        >>> step.kwargs
243        {}
244        """
245        self.step = step
246        self.kwargs = kwargs
247
248    def __str__(self) -> str:
249        """
250        Return a human-readable string representation of the step.
251
252        The string includes the function name and a comma-separated list
253        of the keyword arguments in ``key=value`` format. This is useful
254        for debugging, logging, and understanding pipeline structure.
255
256        Returns
257        -------
258        str
259            A string in the format ``"function_name(arg1=val1, arg2=val2)"``.
260            If there are no keyword arguments, returns ``"function_name()"``.
261
262        Examples
263        --------
264        **Step with arguments:**
265
266        >>> from adc_toolkit.processing.steps.pandas import fill_missing_values
267        >>> step = PipelineStep(fill_missing_values, method="mean", columns=["a", "b"])
268        >>> str(step)
269        "fill_missing_values(method=mean, columns=['a', 'b'])"
270
271        **Step without arguments:**
272
273        >>> def simple_transform(data):
274        ...     return data
275        >>> step = PipelineStep(simple_transform)
276        >>> str(step)
277        'simple_transform()'
278
279        **Used in pipeline string representation:**
280
281        >>> from adc_toolkit.processing import ProcessingPipeline
282        >>> from adc_toolkit.processing.steps.pandas import remove_duplicates
283        >>> pipeline = ProcessingPipeline()
284        >>> pipeline.add(remove_duplicates, subset=["id"])
285        >>> print(pipeline)  # Uses PipelineStep.__str__ internally
286        remove_duplicates(subset=['id'])
287        """
288        kwargs_strings = [f"{key}={value}" for key, value in self.kwargs.items()]
289        return f"{self.step.__name__}({', '.join(kwargs_strings)})"
290
291    def execute(self, data: Data) -> Data:
292        """
293        Execute the wrapped transformation function on the provided data.
294
295        Calls the stored function with the input data as the first argument
296        and the stored keyword arguments. Returns the transformed data.
297
298        Parameters
299        ----------
300        data : Data
301            The input data to transform. Must be a ``Data`` protocol-compatible
302            object (e.g., pandas DataFrame, PySpark DataFrame). The object
303            should be compatible with the wrapped function's expectations.
304
305        Returns
306        -------
307        Data
308            The transformed data returned by the wrapped function. The exact
309            type depends on what the wrapped function returns, but it should
310            satisfy the ``Data`` protocol.
311
312        Raises
313        ------
314        TypeError
315            If the function cannot accept the provided data type, or if
316            required keyword arguments are missing.
317        ValueError
318            If the function raises a ``ValueError`` due to invalid input
319            data or argument values (e.g., referencing a column that
320            doesn't exist).
321        KeyError
322            If the function attempts to access non-existent columns or keys.
323        Exception
324            Any other exception raised by the wrapped function propagates
325            unchanged. The exception type and message depend on the specific
326            function implementation.
327
328        See Also
329        --------
330        ProcessingPipeline.run : Executes multiple steps in sequence.
331
332        Notes
333        -----
334        **Execution Flow**
335
336        The execution is straightforward:
337
338            result = self.step(data, **self.kwargs)
339
340        The wrapped function receives the data as its first positional
341        argument and all stored keyword arguments are unpacked and passed.
342
343        **No Validation**
344
345        This method does not validate the input data or the function's
346        return value against the ``Data`` protocol. It trusts that:
347
348        1. The input satisfies the wrapped function's requirements
349        2. The function returns a valid ``Data`` object
350
351        Validation failures will surface as exceptions from the wrapped
352        function.
353
354        **Error Handling**
355
356        Exceptions from the wrapped function propagate directly to the
357        caller. When used within a :class:`ProcessingPipeline`, this means
358        the pipeline stops on the first error. Consider wrapping sensitive
359        operations with try-except in your step functions if you need
360        custom error handling.
361
362        Examples
363        --------
364        **Basic execution:**
365
366        >>> import pandas as pd
367        >>> from adc_toolkit.processing.step import PipelineStep
368        >>>
369        >>> def double_values(data: pd.DataFrame, column: str) -> pd.DataFrame:
370        ...     result = data.copy()
371        ...     result[column] = result[column] * 2
372        ...     return result
373        >>> step = PipelineStep(double_values, column="amount")
374        >>> df = pd.DataFrame({"amount": [10, 20, 30]})
375        >>> result = step.execute(df)
376        >>> result["amount"].tolist()
377        [20, 40, 60]
378
379        **Handling errors:**
380
381        >>> def strict_transform(data: pd.DataFrame, required_col: str) -> pd.DataFrame:
382        ...     if required_col not in data.columns:
383        ...         raise ValueError(f"Column '{required_col}' not found")
384        ...     return data
385        >>> step = PipelineStep(strict_transform, required_col="missing_column")
386        >>> try:
387        ...     step.execute(df)
388        ... except ValueError as e:
389        ...     print(f"Step failed: {e}")
390        Step failed: Column 'missing_column' not found
391        """
392        result = self.step(data, **self.kwargs)
393
394        return result

A wrapper for transformation functions in a data processing pipeline.

PipelineStep encapsulates a transformation function along with its keyword arguments, creating a reusable, inspectable unit of data transformation. It handles the invocation of the wrapped function with the stored arguments during pipeline execution.

This class is typically not instantiated directly by users. Instead, steps are created implicitly when calling ProcessingPipeline.add() <adc_toolkit.processing.ProcessingPipeline.add>(). However, understanding PipelineStep is useful for debugging and creating custom pipeline behaviors.

Parameters
  • step (Callable[..., Data]): The transformation function to wrap. Must accept a Data object as its first positional argument and return a Data object.
  • **kwargs (Any): Keyword arguments to pass to the function during execution. These are stored and applied every time execute() is called.
Attributes
  • step (Callable[..., Data]): The wrapped transformation function.
  • kwargs (dict[str, Any]): Dictionary of keyword arguments to pass to the function.
See Also

ProcessingPipeline: Uses PipelineStep to orchestrate transformations.
adc_toolkit.processing.steps.pandas: Prebuilt step functions for pandas.

Notes

The Step Function Contract

Any function can be wrapped in a PipelineStep if it follows this pattern:

def transformation(data: Data, **kwargs) -> Data:
    # Perform transformation
    return transformed_data

Where:

  • Data is any object satisfying the Data protocol (has columns and dtypes properties)
  • **kwargs represents any additional parameters your function needs
  • The return value must also satisfy the Data protocol

Flexibility

This design enables wrapping:

  • Prebuilt functions from adc_toolkit.processing.steps
  • pandas built-in methods wrapped as functions
  • PySpark transformations wrapped as functions
  • Custom domain-specific transformation logic

Debugging

The __str__() method provides a human-readable representation showing the function name and its bound arguments, useful for debugging and logging pipeline structure.

Examples

Creating a step with a custom function:

>>> import pandas as pd
>>> from adc_toolkit.processing.step import PipelineStep
>>>
>>> def normalize(data: pd.DataFrame, column: str) -> pd.DataFrame:
...     result = data.copy()
...     col_data = result[column]
...     result[column] = (col_data - col_data.min()) / (col_data.max() - col_data.min())
...     return result
>>> step = PipelineStep(normalize, column="temperature")
>>> print(step)
normalize(column=temperature)

Creating a step with a prebuilt function:

>>> from adc_toolkit.processing.steps.pandas import fill_missing_values
>>>
>>> step = PipelineStep(fill_missing_values, method="mean", columns=["value"])
>>> print(step)
fill_missing_values(method=mean, columns=['value'])

Executing a step:

>>> df = pd.DataFrame({"temperature": [0, 50, 100]})
>>> step = PipelineStep(normalize, column="temperature")
>>> result = step.execute(df)
>>> result["temperature"].tolist()
[0.0, 0.5, 1.0]

Wrapping a pandas operation:

>>> def drop_na(data: pd.DataFrame, subset: list[str] | None = None) -> pd.DataFrame:
...     return data.dropna(subset=subset)
>>> step = PipelineStep(drop_na, subset=["critical_column"])
>>> step.execute(df_with_nulls)
PipelineStep(step: Callable[..., adc_toolkit.data.abs.Data], **kwargs: Any)
185    def __init__(
186        self,
187        step: Callable[..., Data],
188        **kwargs: Any,
189    ) -> None:
190        """
191        Initialize a pipeline step with a function and its arguments.
192
193        Creates a new step by storing the transformation function and any
194        keyword arguments that should be passed to it during execution.
195
196        Parameters
197        ----------
198        step : Callable[..., Data]
199            The transformation function to wrap. This function must:
200
201            - Accept a ``Data`` object as its first positional argument
202            - Return a ``Data`` object
203            - Accept any additional parameters as keyword arguments
204
205            The function can be a prebuilt step, a pandas/PySpark method
206            wrapped as a function, or a custom transformation.
207
208        **kwargs : Any
209            Keyword arguments to pass to the step function when
210            :meth:`execute` is called. These should match the function's
211            parameter names (excluding the first ``data`` parameter).
212
213        See Also
214        --------
215        execute : Run the step on data.
216
217        Notes
218        -----
219        The keyword arguments are stored as-is in the ``kwargs`` attribute.
220        They are not validated at initialization time; validation happens
221        when the function is called during :meth:`execute`. This allows
222        for flexible argument passing but means errors in argument names
223        or types will only surface at execution time.
224
225        Examples
226        --------
227        **Basic initialization:**
228
229        >>> def add_suffix(data, column: str, suffix: str) -> pd.DataFrame:
230        ...     result = data.copy()
231        ...     result[column] = result[column].astype(str) + suffix
232        ...     return result
233        >>> step = PipelineStep(add_suffix, column="name", suffix="_processed")
234        >>> step.kwargs
235        {'column': 'name', 'suffix': '_processed'}
236
237        **With no additional arguments:**
238
239        >>> def identity(data):
240        ...     return data.copy()
241        >>> step = PipelineStep(identity)
242        >>> step.kwargs
243        {}
244        """
245        self.step = step
246        self.kwargs = kwargs

Initialize a pipeline step with a function and its arguments.

Creates a new step by storing the transformation function and any keyword arguments that should be passed to it during execution.

Parameters
  • step (Callable[..., Data]): The transformation function to wrap. This function must:

    • Accept a Data object as its first positional argument
    • Return a Data object
    • Accept any additional parameters as keyword arguments

    The function can be a prebuilt step, a pandas/PySpark method wrapped as a function, or a custom transformation.

  • **kwargs (Any): Keyword arguments to pass to the step function when execute() is called. These should match the function's parameter names (excluding the first data parameter).
See Also

execute: Run the step on data.

Notes

The keyword arguments are stored as-is in the kwargs attribute. They are not validated at initialization time; validation happens when the function is called during execute(). This allows for flexible argument passing but means errors in argument names or types will only surface at execution time.

Examples

Basic initialization:

>>> def add_suffix(data, column: str, suffix: str) -> pd.DataFrame:
...     result = data.copy()
...     result[column] = result[column].astype(str) + suffix
...     return result
>>> step = PipelineStep(add_suffix, column="name", suffix="_processed")
>>> step.kwargs
{'column': 'name', 'suffix': '_processed'}

With no additional arguments:

>>> def identity(data):
...     return data.copy()
>>> step = PipelineStep(identity)
>>> step.kwargs
{}
step
kwargs
def execute(self, data: adc_toolkit.data.abs.Data) -> adc_toolkit.data.abs.Data:
291    def execute(self, data: Data) -> Data:
292        """
293        Execute the wrapped transformation function on the provided data.
294
295        Calls the stored function with the input data as the first argument
296        and the stored keyword arguments. Returns the transformed data.
297
298        Parameters
299        ----------
300        data : Data
301            The input data to transform. Must be a ``Data`` protocol-compatible
302            object (e.g., pandas DataFrame, PySpark DataFrame). The object
303            should be compatible with the wrapped function's expectations.
304
305        Returns
306        -------
307        Data
308            The transformed data returned by the wrapped function. The exact
309            type depends on what the wrapped function returns, but it should
310            satisfy the ``Data`` protocol.
311
312        Raises
313        ------
314        TypeError
315            If the function cannot accept the provided data type, or if
316            required keyword arguments are missing.
317        ValueError
318            If the function raises a ``ValueError`` due to invalid input
319            data or argument values (e.g., referencing a column that
320            doesn't exist).
321        KeyError
322            If the function attempts to access non-existent columns or keys.
323        Exception
324            Any other exception raised by the wrapped function propagates
325            unchanged. The exception type and message depend on the specific
326            function implementation.
327
328        See Also
329        --------
330        ProcessingPipeline.run : Executes multiple steps in sequence.
331
332        Notes
333        -----
334        **Execution Flow**
335
336        The execution is straightforward:
337
338            result = self.step(data, **self.kwargs)
339
340        The wrapped function receives the data as its first positional
341        argument and all stored keyword arguments are unpacked and passed.
342
343        **No Validation**
344
345        This method does not validate the input data or the function's
346        return value against the ``Data`` protocol. It trusts that:
347
348        1. The input satisfies the wrapped function's requirements
349        2. The function returns a valid ``Data`` object
350
351        Validation failures will surface as exceptions from the wrapped
352        function.
353
354        **Error Handling**
355
356        Exceptions from the wrapped function propagate directly to the
357        caller. When used within a :class:`ProcessingPipeline`, this means
358        the pipeline stops on the first error. Consider wrapping sensitive
359        operations with try-except in your step functions if you need
360        custom error handling.
361
362        Examples
363        --------
364        **Basic execution:**
365
366        >>> import pandas as pd
367        >>> from adc_toolkit.processing.step import PipelineStep
368        >>>
369        >>> def double_values(data: pd.DataFrame, column: str) -> pd.DataFrame:
370        ...     result = data.copy()
371        ...     result[column] = result[column] * 2
372        ...     return result
373        >>> step = PipelineStep(double_values, column="amount")
374        >>> df = pd.DataFrame({"amount": [10, 20, 30]})
375        >>> result = step.execute(df)
376        >>> result["amount"].tolist()
377        [20, 40, 60]
378
379        **Handling errors:**
380
381        >>> def strict_transform(data: pd.DataFrame, required_col: str) -> pd.DataFrame:
382        ...     if required_col not in data.columns:
383        ...         raise ValueError(f"Column '{required_col}' not found")
384        ...     return data
385        >>> step = PipelineStep(strict_transform, required_col="missing_column")
386        >>> try:
387        ...     step.execute(df)
388        ... except ValueError as e:
389        ...     print(f"Step failed: {e}")
390        Step failed: Column 'missing_column' not found
391        """
392        result = self.step(data, **self.kwargs)
393
394        return result

Execute the wrapped transformation function on the provided data.

Calls the stored function with the input data as the first argument and the stored keyword arguments. Returns the transformed data.

Parameters
  • data (Data): The input data to transform. Must be a Data protocol-compatible object (e.g., pandas DataFrame, PySpark DataFrame). The object should be compatible with the wrapped function's expectations.
Returns
  • Data: The transformed data returned by the wrapped function. The exact type depends on what the wrapped function returns, but it should satisfy the Data protocol.
Raises
  • TypeError: If the function cannot accept the provided data type, or if required keyword arguments are missing.
  • ValueError: If the function raises a ValueError due to invalid input data or argument values (e.g., referencing a column that doesn't exist).
  • KeyError: If the function attempts to access non-existent columns or keys.
  • Exception: Any other exception raised by the wrapped function propagates unchanged. The exception type and message depend on the specific function implementation.
See Also

ProcessingPipeline.run: Executes multiple steps in sequence.

Notes

Execution Flow

The execution is straightforward:

result = self.step(data, **self.kwargs)

The wrapped function receives the data as its first positional argument and all stored keyword arguments are unpacked and passed.

No Validation

This method does not validate the input data or the function's return value against the Data protocol. It trusts that:

  1. The input satisfies the wrapped function's requirements
  2. The function returns a valid Data object

Validation failures will surface as exceptions from the wrapped function.

Error Handling

Exceptions from the wrapped function propagate directly to the caller. When used within a ProcessingPipeline, this means the pipeline stops on the first error. Consider wrapping sensitive operations with try-except in your step functions if you need custom error handling.

Examples

Basic execution:

>>> import pandas as pd
>>> from adc_toolkit.processing.step import PipelineStep
>>>
>>> def double_values(data: pd.DataFrame, column: str) -> pd.DataFrame:
...     result = data.copy()
...     result[column] = result[column] * 2
...     return result
>>> step = PipelineStep(double_values, column="amount")
>>> df = pd.DataFrame({"amount": [10, 20, 30]})
>>> result = step.execute(df)
>>> result["amount"].tolist()
[20, 40, 60]

Handling errors:

>>> def strict_transform(data: pd.DataFrame, required_col: str) -> pd.DataFrame:
...     if required_col not in data.columns:
...         raise ValueError(f"Column '{required_col}' not found")
...     return data
>>> step = PipelineStep(strict_transform, required_col="missing_column")
>>> try:
...     step.execute(df)
... except ValueError as e:
...     print(f"Step failed: {e}")
Step failed: Column 'missing_column' not found
class ProcessingPipeline:
 40class ProcessingPipeline:
 41    """
 42    A pipeline for processing data through a sequence of transformation steps.
 43
 44    ``ProcessingPipeline`` orchestrates the execution of multiple processing
 45    steps in a defined order. Each step transforms the data and passes the
 46    result to the next step. The pipeline supports method chaining for fluent
 47    construction and preserves the original input data through deep copying.
 48
 49    The pipeline is agnostic to the specific transformation functions used.
 50    Any callable that accepts a ``Data`` object (e.g., pandas DataFrame) as
 51    its first argument and returns a ``Data`` object can be added as a step.
 52    This enables mixing:
 53
 54    - Prebuilt steps from :mod:`adc_toolkit.processing.steps`
 55    - Built-in pandas or PySpark transformation methods
 56    - Custom functions tailored to your specific needs
 57
 58    Attributes
 59    ----------
 60    steps : list[PipelineStep]
 61        The ordered list of pipeline steps to execute. Each step is a
 62        :class:`PipelineStep` instance wrapping a transformation function
 63        and its keyword arguments.
 64
 65    See Also
 66    --------
 67    PipelineStep : The wrapper class for individual transformation functions.
 68    adc_toolkit.processing.steps.pandas : Prebuilt pandas transformation steps.
 69    adc_toolkit.data.abs.Data : Protocol defining compatible data objects.
 70
 71    Notes
 72    -----
 73    **Immutability**: The :meth:`run` method creates a deep copy of the input
 74    data before processing. This ensures the original data is never modified,
 75    which is important for reproducibility and debugging. However, deep copying
 76    can be expensive for very large datasets.
 77
 78    **Sequential Execution**: Steps execute in the order they were added.
 79    The output of each step becomes the input to the next step. There is no
 80    parallel execution or branching.
 81
 82    **Error Propagation**: If any step raises an exception, the pipeline
 83    stops immediately and the exception propagates to the caller. Partial
 84    results are not returned.
 85
 86    Examples
 87    --------
 88    **Basic usage with prebuilt steps:**
 89
 90    >>> from adc_toolkit.processing import ProcessingPipeline
 91    >>> from adc_toolkit.processing.steps.pandas import (
 92    ...     remove_duplicates,
 93    ...     fill_missing_values,
 94    ...     make_columns_snake_case,
 95    ... )
 96    >>> import pandas as pd
 97    >>>
 98    >>> # Create sample data
 99    >>> df = pd.DataFrame(
100    ...     {
101    ...         "CustomerID": [1, 1, 2, 3],
102    ...         "Value": [10.0, 10.0, None, 30.0],
103    ...     }
104    ... )
105    >>>
106    >>> # Build and run pipeline
107    >>> pipeline = ProcessingPipeline()
108    >>> pipeline.add(remove_duplicates, subset=["CustomerID"])
109    >>> pipeline.add(fill_missing_values, method="mean", columns=["Value"])
110    >>> pipeline.add(make_columns_snake_case)
111    >>> result = pipeline.run(df)
112    >>> result.columns.tolist()
113    ['customer_id', 'value']
114
115    **Method chaining for fluent construction:**
116
117    >>> pipeline = ProcessingPipeline().add(remove_duplicates, subset=["id"]).add(fill_missing_values, method="median")
118    >>> len(pipeline)
119    2
120
121    **Using custom transformation functions:**
122
123    >>> def log_transform(data: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
124    ...     '''Apply log transformation to specified columns.'''
125    ...     import numpy as np
126    ...
127    ...     result = data.copy()
128    ...     for col in columns:
129    ...         result[col] = np.log1p(result[col])
130    ...     return result
131    >>> pipeline = ProcessingPipeline()
132    >>> pipeline.add(log_transform, columns=["price", "quantity"])
133    >>> transformed = pipeline.run(sales_data)
134
135    **Using pandas functions directly:**
136
137    Many pandas functions can be used directly since they accept a DataFrame
138    as the first argument and return a DataFrame:
139
140    >>> import pandas as pd
141    >>>
142    >>> # Merge with a reference table
143    >>> pipeline = ProcessingPipeline()
144    >>> pipeline.add(pd.merge, right=lookup_df, how="left", on="category_id")
145    >>> enriched = pipeline.run(sales_df)
146    >>>
147    >>> # Query rows using pd.DataFrame.query (via eval)
148    >>> pipeline = ProcessingPipeline()
149    >>> pipeline.add(pd.eval, expr="amount * quantity", target=df)
150
151    **Inspecting pipeline structure:**
152
153    >>> pipeline = ProcessingPipeline()
154    >>> pipeline.add(remove_duplicates, subset=["id"])
155    >>> pipeline.add(fill_missing_values, method="mean")
156    >>> print(pipeline)
157    remove_duplicates(subset=['id']) -> fill_missing_values(method=mean)
158    >>> len(pipeline)
159    2
160    """
161
162    def __init__(self) -> None:
163        """
164        Initialize an empty processing pipeline.
165
166        Creates a new pipeline with no steps. Steps can be added using the
167        :meth:`add` method.
168
169        Examples
170        --------
171        >>> pipeline = ProcessingPipeline()
172        >>> len(pipeline)
173        0
174        """
175        self.steps = list[PipelineStep]()
176
177    def __str__(self) -> str:
178        """
179        Return a human-readable string representation of the pipeline.
180
181        The string shows each step's function name and parameters, joined
182        by arrows to indicate the flow of data through the pipeline.
183
184        Returns
185        -------
186        str
187            A string representation showing all steps in order, formatted as
188            ``"step1(params) -> step2(params) -> ..."``. Returns an empty
189            string if the pipeline has no steps.
190
191        Examples
192        --------
193        >>> from adc_toolkit.processing.steps.pandas import (
194        ...     remove_duplicates,
195        ...     fill_missing_values,
196        ... )
197        >>> pipeline = ProcessingPipeline()
198        >>> pipeline.add(remove_duplicates, subset=["id"])
199        >>> pipeline.add(fill_missing_values, method="mean")
200        >>> print(pipeline)
201        remove_duplicates(subset=['id']) -> fill_missing_values(method=mean)
202        """
203        return " -> ".join([str(step) for step in self.steps])
204
205    def __len__(self) -> int:
206        """
207        Return the number of steps in the pipeline.
208
209        Returns
210        -------
211        int
212            The count of transformation steps currently in the pipeline.
213
214        Examples
215        --------
216        >>> pipeline = ProcessingPipeline()
217        >>> len(pipeline)
218        0
219        >>> pipeline.add(lambda df: df)
220        <ProcessingPipeline object>
221        >>> len(pipeline)
222        1
223        """
224        return len(self.steps)
225
226    def add(
227        self,
228        step: Callable[..., Data],
229        **kwargs: Any,
230    ) -> "ProcessingPipeline":
231        """
232        Add a transformation step to the pipeline.
233
234        Appends a new step to the end of the pipeline. The step consists of
235        a callable (function) and any keyword arguments to pass to it during
236        execution. The callable must follow the step contract: accept a
237        ``Data`` object as the first positional argument and return a
238        ``Data`` object.
239
240        This method returns ``self`` to enable method chaining, allowing
241        multiple steps to be added in a fluent style.
242
243        Parameters
244        ----------
245        step : Callable[..., Data]
246            The transformation function to execute. Must accept a ``Data``
247            object (e.g., pandas DataFrame, PySpark DataFrame) as its first
248            positional argument and return a ``Data`` object. Can be:
249
250            - A prebuilt step from :mod:`adc_toolkit.processing.steps`
251            - A pandas/PySpark function directly (e.g., ``pd.merge``)
252            - A lambda function for simple inline transformations
253            - A custom function you define
254
255        **kwargs : Any
256            Keyword arguments to pass to the step function during execution.
257            These are stored with the step and applied when :meth:`run` is
258            called. The arguments should match the function's signature
259            (excluding the first ``data`` parameter).
260
261        Returns
262        -------
263        ProcessingPipeline
264            Returns ``self`` to enable method chaining.
265
266        See Also
267        --------
268        run : Execute the pipeline on data.
269        PipelineStep : The wrapper class for transformation functions.
270
271        Notes
272        -----
273        **The Step Contract**
274
275        Any function can be used as a step if it follows this signature:
276
277            def my_step(data: Data, param1: T1, param2: T2, ...) -> Data:
278                # Transform the data
279                return transformed_data
280
281        The ``Data`` protocol requires only ``columns`` and ``dtypes``
282        properties. pandas DataFrames and PySpark DataFrames satisfy this
283        naturally, so you can use them directly without any wrappers.
284
285        **Keyword Arguments Only**
286
287        Only keyword arguments (not positional) can be passed to the step
288        function via the ``**kwargs`` parameter. If your function requires
289        positional arguments beyond the ``data`` parameter, consider using
290        ``functools.partial`` or a lambda wrapper.
291
292        Examples
293        --------
294        **Using prebuilt steps:**
295
296        >>> from adc_toolkit.processing.steps.pandas import remove_duplicates
297        >>> pipeline = ProcessingPipeline()
298        >>> pipeline.add(remove_duplicates, subset=["customer_id"], keep="first")
299        <ProcessingPipeline object>
300
301        **Using a custom function:**
302
303        >>> def scale_column(data, column: str, factor: float) -> pd.DataFrame:
304        ...     result = data.copy()
305        ...     result[column] = result[column] * factor
306        ...     return result
307        >>> pipeline.add(scale_column, column="price", factor=1.1)
308        <ProcessingPipeline object>
309
310        **Method chaining:**
311
312        >>> from adc_toolkit.processing.steps.pandas import (
313        ...     remove_duplicates,
314        ...     fill_missing_values,
315        ...     select_columns,
316        ... )
317        >>> pipeline = (
318        ...     ProcessingPipeline()
319        ...     .add(remove_duplicates)
320        ...     .add(fill_missing_values, method="mean")
321        ...     .add(select_columns, columns=["id", "name", "value"])
322        ... )
323        >>> len(pipeline)
324        3
325
326        **Using pandas functions directly:**
327
328        >>> import pandas as pd
329        >>> pipeline = ProcessingPipeline()
330        >>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
331        <ProcessingPipeline object>
332
333        **Using a lambda for simple transformations:**
334
335        >>> pipeline = ProcessingPipeline()
336        >>> pipeline.add(lambda df: df.reset_index(drop=True))
337        <ProcessingPipeline object>
338        """
339        self.steps.append(PipelineStep(step, **kwargs))
340
341        return self
342
343    def run(self, data: Data) -> Data:
344        """
345        Execute the pipeline on the provided data.
346
347        Runs all transformation steps in sequence on a deep copy of the
348        input data. Each step receives the output of the previous step as
349        its input. The original data is never modified.
350
351        Parameters
352        ----------
353        data : Data
354            The input data to process. Must be a ``Data`` protocol-compatible
355            object (e.g., pandas DataFrame, PySpark DataFrame). The object
356            must support deep copying via ``copy.deepcopy``.
357
358        Returns
359        -------
360        Data
361            The transformed data after all pipeline steps have been applied.
362            Returns a new object; the original ``data`` is unchanged.
363
364        Raises
365        ------
366        TypeError
367            If the input data cannot be deep copied.
368        Exception
369            Any exception raised by a step function propagates directly.
370            The exception message indicates which step failed.
371
372        See Also
373        --------
374        add : Add steps to the pipeline before running.
375
376        Notes
377        -----
378        **Immutability**
379
380        The pipeline creates a deep copy of the input data before processing.
381        This ensures:
382
383        - The original data is preserved for comparison or debugging
384        - Multiple runs with the same input produce consistent results
385        - Side effects from step functions don't affect the original
386
387        For very large datasets, this copying may have performance
388        implications. Consider:
389
390        - Using PySpark for distributed processing of large data
391        - Processing data in chunks if memory is constrained
392        - Implementing steps that operate in-place if you're certain
393          about data ownership
394
395        **Sequential Execution**
396
397        Steps execute strictly in order. The data flow is:
398
399            input -> step1 -> step2 -> ... -> stepN -> output
400
401        There is no parallel execution. If a step fails, subsequent steps
402        do not run and the exception propagates immediately.
403
404        **Empty Pipeline**
405
406        Running an empty pipeline (no steps added) returns a deep copy of
407        the input data without any transformations.
408
409        Examples
410        --------
411        **Basic execution:**
412
413        >>> import pandas as pd
414        >>> from adc_toolkit.processing import ProcessingPipeline
415        >>> from adc_toolkit.processing.steps.pandas import remove_duplicates
416        >>>
417        >>> df = pd.DataFrame({"id": [1, 1, 2], "value": [10, 10, 20]})
418        >>> pipeline = ProcessingPipeline()
419        >>> pipeline.add(remove_duplicates, subset=["id"])
420        >>> result = pipeline.run(df)
421        >>> len(result)
422        2
423        >>> len(df)  # Original unchanged
424        3
425
426        **Running the same pipeline multiple times:**
427
428        >>> result1 = pipeline.run(data_batch_1)
429        >>> result2 = pipeline.run(data_batch_2)
430        >>> result3 = pipeline.run(data_batch_3)
431
432        **Empty pipeline returns a copy:**
433
434        >>> empty_pipeline = ProcessingPipeline()
435        >>> result = empty_pipeline.run(df)
436        >>> result is df
437        False
438        >>> result.equals(df)
439        True
440        """
441        result = copy.deepcopy(data)
442
443        for step in self.steps:
444            result = step.execute(result)
445
446        return result

A pipeline for processing data through a sequence of transformation steps.

ProcessingPipeline orchestrates the execution of multiple processing steps in a defined order. Each step transforms the data and passes the result to the next step. The pipeline supports method chaining for fluent construction and preserves the original input data through deep copying.

The pipeline is agnostic to the specific transformation functions used. Any callable that accepts a Data object (e.g., pandas DataFrame) as its first argument and returns a Data object can be added as a step. This enables mixing:

  • Prebuilt steps from adc_toolkit.processing.steps
  • Built-in pandas or PySpark transformation methods
  • Custom functions tailored to your specific needs
Attributes
  • steps (list[PipelineStep]): The ordered list of pipeline steps to execute. Each step is a PipelineStep instance wrapping a transformation function and its keyword arguments.
See Also

PipelineStep: The wrapper class for individual transformation functions.
adc_toolkit.processing.steps.pandas: Prebuilt pandas transformation steps.
adc_toolkit.data.abs.Data: Protocol defining compatible data objects.

Notes

Immutability: The run() method creates a deep copy of the input data before processing. This ensures the original data is never modified, which is important for reproducibility and debugging. However, deep copying can be expensive for very large datasets.

Sequential Execution: Steps execute in the order they were added. The output of each step becomes the input to the next step. There is no parallel execution or branching.

Error Propagation: If any step raises an exception, the pipeline stops immediately and the exception propagates to the caller. Partial results are not returned.

Examples

Basic usage with prebuilt steps:

>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import (
...     remove_duplicates,
...     fill_missing_values,
...     make_columns_snake_case,
... )
>>> import pandas as pd
>>>
>>> # Create sample data
>>> df = pd.DataFrame(
...     {
...         "CustomerID": [1, 1, 2, 3],
...         "Value": [10.0, 10.0, None, 30.0],
...     }
... )
>>>
>>> # Build and run pipeline
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["CustomerID"])
>>> pipeline.add(fill_missing_values, method="mean", columns=["Value"])
>>> pipeline.add(make_columns_snake_case)
>>> result = pipeline.run(df)
>>> result.columns.tolist()
['customer_id', 'value']

Method chaining for fluent construction:

>>> pipeline = ProcessingPipeline().add(remove_duplicates, subset=["id"]).add(fill_missing_values, method="median")
>>> len(pipeline)
2

Using custom transformation functions:

>>> def log_transform(data: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
...     '''Apply log transformation to specified columns.'''
...     import numpy as np
...
...     result = data.copy()
...     for col in columns:
...         result[col] = np.log1p(result[col])
...     return result
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(log_transform, columns=["price", "quantity"])
>>> transformed = pipeline.run(sales_data)

Using pandas functions directly:

Many pandas functions can be used directly since they accept a DataFrame as the first argument and return a DataFrame:

>>> import pandas as pd
>>>
>>> # Merge with a reference table
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.merge, right=lookup_df, how="left", on="category_id")
>>> enriched = pipeline.run(sales_df)
>>>
>>> # Query rows using pd.DataFrame.query (via eval)
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.eval, expr="amount * quantity", target=df)

Inspecting pipeline structure:

>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["id"])
>>> pipeline.add(fill_missing_values, method="mean")
>>> print(pipeline)
remove_duplicates(subset=['id']) -> fill_missing_values(method=mean)
>>> len(pipeline)
2
ProcessingPipeline()
162    def __init__(self) -> None:
163        """
164        Initialize an empty processing pipeline.
165
166        Creates a new pipeline with no steps. Steps can be added using the
167        :meth:`add` method.
168
169        Examples
170        --------
171        >>> pipeline = ProcessingPipeline()
172        >>> len(pipeline)
173        0
174        """
175        self.steps = list[PipelineStep]()

Initialize an empty processing pipeline.

Creates a new pipeline with no steps. Steps can be added using the add() method.

Examples
>>> pipeline = ProcessingPipeline()
>>> len(pipeline)
0
steps
def add( self, step: Callable[..., adc_toolkit.data.abs.Data], **kwargs: Any) -> ProcessingPipeline:
226    def add(
227        self,
228        step: Callable[..., Data],
229        **kwargs: Any,
230    ) -> "ProcessingPipeline":
231        """
232        Add a transformation step to the pipeline.
233
234        Appends a new step to the end of the pipeline. The step consists of
235        a callable (function) and any keyword arguments to pass to it during
236        execution. The callable must follow the step contract: accept a
237        ``Data`` object as the first positional argument and return a
238        ``Data`` object.
239
240        This method returns ``self`` to enable method chaining, allowing
241        multiple steps to be added in a fluent style.
242
243        Parameters
244        ----------
245        step : Callable[..., Data]
246            The transformation function to execute. Must accept a ``Data``
247            object (e.g., pandas DataFrame, PySpark DataFrame) as its first
248            positional argument and return a ``Data`` object. Can be:
249
250            - A prebuilt step from :mod:`adc_toolkit.processing.steps`
251            - A pandas/PySpark function directly (e.g., ``pd.merge``)
252            - A lambda function for simple inline transformations
253            - A custom function you define
254
255        **kwargs : Any
256            Keyword arguments to pass to the step function during execution.
257            These are stored with the step and applied when :meth:`run` is
258            called. The arguments should match the function's signature
259            (excluding the first ``data`` parameter).
260
261        Returns
262        -------
263        ProcessingPipeline
264            Returns ``self`` to enable method chaining.
265
266        See Also
267        --------
268        run : Execute the pipeline on data.
269        PipelineStep : The wrapper class for transformation functions.
270
271        Notes
272        -----
273        **The Step Contract**
274
275        Any function can be used as a step if it follows this signature:
276
277            def my_step(data: Data, param1: T1, param2: T2, ...) -> Data:
278                # Transform the data
279                return transformed_data
280
281        The ``Data`` protocol requires only ``columns`` and ``dtypes``
282        properties. pandas DataFrames and PySpark DataFrames satisfy this
283        naturally, so you can use them directly without any wrappers.
284
285        **Keyword Arguments Only**
286
287        Only keyword arguments (not positional) can be passed to the step
288        function via the ``**kwargs`` parameter. If your function requires
289        positional arguments beyond the ``data`` parameter, consider using
290        ``functools.partial`` or a lambda wrapper.
291
292        Examples
293        --------
294        **Using prebuilt steps:**
295
296        >>> from adc_toolkit.processing.steps.pandas import remove_duplicates
297        >>> pipeline = ProcessingPipeline()
298        >>> pipeline.add(remove_duplicates, subset=["customer_id"], keep="first")
299        <ProcessingPipeline object>
300
301        **Using a custom function:**
302
303        >>> def scale_column(data, column: str, factor: float) -> pd.DataFrame:
304        ...     result = data.copy()
305        ...     result[column] = result[column] * factor
306        ...     return result
307        >>> pipeline.add(scale_column, column="price", factor=1.1)
308        <ProcessingPipeline object>
309
310        **Method chaining:**
311
312        >>> from adc_toolkit.processing.steps.pandas import (
313        ...     remove_duplicates,
314        ...     fill_missing_values,
315        ...     select_columns,
316        ... )
317        >>> pipeline = (
318        ...     ProcessingPipeline()
319        ...     .add(remove_duplicates)
320        ...     .add(fill_missing_values, method="mean")
321        ...     .add(select_columns, columns=["id", "name", "value"])
322        ... )
323        >>> len(pipeline)
324        3
325
326        **Using pandas functions directly:**
327
328        >>> import pandas as pd
329        >>> pipeline = ProcessingPipeline()
330        >>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
331        <ProcessingPipeline object>
332
333        **Using a lambda for simple transformations:**
334
335        >>> pipeline = ProcessingPipeline()
336        >>> pipeline.add(lambda df: df.reset_index(drop=True))
337        <ProcessingPipeline object>
338        """
339        self.steps.append(PipelineStep(step, **kwargs))
340
341        return self

Add a transformation step to the pipeline.

Appends a new step to the end of the pipeline. The step consists of a callable (function) and any keyword arguments to pass to it during execution. The callable must follow the step contract: accept a Data object as the first positional argument and return a Data object.

This method returns self to enable method chaining, allowing multiple steps to be added in a fluent style.

Parameters
  • step (Callable[..., Data]): The transformation function to execute. Must accept a Data object (e.g., pandas DataFrame, PySpark DataFrame) as its first positional argument and return a Data object. Can be:

    • A prebuilt step from adc_toolkit.processing.steps
    • A pandas/PySpark function directly (e.g., pd.merge)
    • A lambda function for simple inline transformations
    • A custom function you define
  • **kwargs (Any): Keyword arguments to pass to the step function during execution. These are stored with the step and applied when run() is called. The arguments should match the function's signature (excluding the first data parameter).
Returns
  • ProcessingPipeline: Returns self to enable method chaining.
See Also

run: Execute the pipeline on data.
PipelineStep: The wrapper class for transformation functions.

Notes

The Step Contract

Any function can be used as a step if it follows this signature:

def my_step(data: Data, param1: T1, param2: T2, ...) -> Data:
    # Transform the data
    return transformed_data

The Data protocol requires only columns and dtypes properties. pandas DataFrames and PySpark DataFrames satisfy this naturally, so you can use them directly without any wrappers.

Keyword Arguments Only

Only keyword arguments (not positional) can be passed to the step function via the **kwargs parameter. If your function requires positional arguments beyond the data parameter, consider using functools.partial or a lambda wrapper.

Examples

Using prebuilt steps:

>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["customer_id"], keep="first")
<ProcessingPipeline object>

Using a custom function:

>>> def scale_column(data, column: str, factor: float) -> pd.DataFrame:
...     result = data.copy()
...     result[column] = result[column] * factor
...     return result
>>> pipeline.add(scale_column, column="price", factor=1.1)
<ProcessingPipeline object>

Method chaining:

>>> from adc_toolkit.processing.steps.pandas import (
...     remove_duplicates,
...     fill_missing_values,
...     select_columns,
... )
>>> pipeline = (
...     ProcessingPipeline()
...     .add(remove_duplicates)
...     .add(fill_missing_values, method="mean")
...     .add(select_columns, columns=["id", "name", "value"])
... )
>>> len(pipeline)
3

Using pandas functions directly:

>>> import pandas as pd
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(pd.merge, right=reference_df, how="left", on="id")
<ProcessingPipeline object>

Using a lambda for simple transformations:

>>> pipeline = ProcessingPipeline()
>>> pipeline.add(lambda df: df.reset_index(drop=True))
<ProcessingPipeline object>
def run(self, data: adc_toolkit.data.abs.Data) -> adc_toolkit.data.abs.Data:
343    def run(self, data: Data) -> Data:
344        """
345        Execute the pipeline on the provided data.
346
347        Runs all transformation steps in sequence on a deep copy of the
348        input data. Each step receives the output of the previous step as
349        its input. The original data is never modified.
350
351        Parameters
352        ----------
353        data : Data
354            The input data to process. Must be a ``Data`` protocol-compatible
355            object (e.g., pandas DataFrame, PySpark DataFrame). The object
356            must support deep copying via ``copy.deepcopy``.
357
358        Returns
359        -------
360        Data
361            The transformed data after all pipeline steps have been applied.
362            Returns a new object; the original ``data`` is unchanged.
363
364        Raises
365        ------
366        TypeError
367            If the input data cannot be deep copied.
368        Exception
369            Any exception raised by a step function propagates directly.
370            The exception message indicates which step failed.
371
372        See Also
373        --------
374        add : Add steps to the pipeline before running.
375
376        Notes
377        -----
378        **Immutability**
379
380        The pipeline creates a deep copy of the input data before processing.
381        This ensures:
382
383        - The original data is preserved for comparison or debugging
384        - Multiple runs with the same input produce consistent results
385        - Side effects from step functions don't affect the original
386
387        For very large datasets, this copying may have performance
388        implications. Consider:
389
390        - Using PySpark for distributed processing of large data
391        - Processing data in chunks if memory is constrained
392        - Implementing steps that operate in-place if you're certain
393          about data ownership
394
395        **Sequential Execution**
396
397        Steps execute strictly in order. The data flow is:
398
399            input -> step1 -> step2 -> ... -> stepN -> output
400
401        There is no parallel execution. If a step fails, subsequent steps
402        do not run and the exception propagates immediately.
403
404        **Empty Pipeline**
405
406        Running an empty pipeline (no steps added) returns a deep copy of
407        the input data without any transformations.
408
409        Examples
410        --------
411        **Basic execution:**
412
413        >>> import pandas as pd
414        >>> from adc_toolkit.processing import ProcessingPipeline
415        >>> from adc_toolkit.processing.steps.pandas import remove_duplicates
416        >>>
417        >>> df = pd.DataFrame({"id": [1, 1, 2], "value": [10, 10, 20]})
418        >>> pipeline = ProcessingPipeline()
419        >>> pipeline.add(remove_duplicates, subset=["id"])
420        >>> result = pipeline.run(df)
421        >>> len(result)
422        2
423        >>> len(df)  # Original unchanged
424        3
425
426        **Running the same pipeline multiple times:**
427
428        >>> result1 = pipeline.run(data_batch_1)
429        >>> result2 = pipeline.run(data_batch_2)
430        >>> result3 = pipeline.run(data_batch_3)
431
432        **Empty pipeline returns a copy:**
433
434        >>> empty_pipeline = ProcessingPipeline()
435        >>> result = empty_pipeline.run(df)
436        >>> result is df
437        False
438        >>> result.equals(df)
439        True
440        """
441        result = copy.deepcopy(data)
442
443        for step in self.steps:
444            result = step.execute(result)
445
446        return result

Execute the pipeline on the provided data.

Runs all transformation steps in sequence on a deep copy of the input data. Each step receives the output of the previous step as its input. The original data is never modified.

Parameters
  • data (Data): The input data to process. Must be a Data protocol-compatible object (e.g., pandas DataFrame, PySpark DataFrame). The object must support deep copying via copy.deepcopy.
Returns
  • Data: The transformed data after all pipeline steps have been applied. Returns a new object; the original data is unchanged.
Raises
  • TypeError: If the input data cannot be deep copied.
  • Exception: Any exception raised by a step function propagates directly. The exception message indicates which step failed.
See Also

add: Add steps to the pipeline before running.

Notes

Immutability

The pipeline creates a deep copy of the input data before processing. This ensures:

  • The original data is preserved for comparison or debugging
  • Multiple runs with the same input produce consistent results
  • Side effects from step functions don't affect the original

For very large datasets, this copying may have performance implications. Consider:

  • Using PySpark for distributed processing of large data
  • Processing data in chunks if memory is constrained
  • Implementing steps that operate in-place if you're certain about data ownership

Sequential Execution

Steps execute strictly in order. The data flow is:

input -> step1 -> step2 -> ... -> stepN -> output

There is no parallel execution. If a step fails, subsequent steps do not run and the exception propagates immediately.

Empty Pipeline

Running an empty pipeline (no steps added) returns a deep copy of the input data without any transformations.

Examples

Basic execution:

>>> import pandas as pd
>>> from adc_toolkit.processing import ProcessingPipeline
>>> from adc_toolkit.processing.steps.pandas import remove_duplicates
>>>
>>> df = pd.DataFrame({"id": [1, 1, 2], "value": [10, 10, 20]})
>>> pipeline = ProcessingPipeline()
>>> pipeline.add(remove_duplicates, subset=["id"])
>>> result = pipeline.run(df)
>>> len(result)
2
>>> len(df)  # Original unchanged
3

Running the same pipeline multiple times:

>>> result1 = pipeline.run(data_batch_1)
>>> result2 = pipeline.run(data_batch_2)
>>> result3 = pipeline.run(data_batch_3)

Empty pipeline returns a copy:

>>> empty_pipeline = ProcessingPipeline()
>>> result = empty_pipeline.run(df)
>>> result is df
False
>>> result.equals(df)
True