adc_toolkit.data.catalog

Default adc_toolkit data catalog.

This module provides the ValidatedDataCatalog class, which is the primary user-facing API for the adc-toolkit data handling system. It combines a data catalog (for I/O operations) with a data validator (for quality checks) to provide automatic validation on all data loading and saving operations.

  1"""
  2Default adc_toolkit data catalog.
  3
  4This module provides the ValidatedDataCatalog class, which is the primary
  5user-facing API for the adc-toolkit data handling system. It combines a
  6data catalog (for I/O operations) with a data validator (for quality checks)
  7to provide automatic validation on all data loading and saving operations.
  8"""
  9
 10from pathlib import Path
 11from typing import Any
 12
 13from adc_toolkit.data.abs import Data, DataCatalog, DataValidator
 14from adc_toolkit.data.default_attributes import default_catalog, default_validator
 15
 16
 17class ValidatedDataCatalog:
 18    """
 19    Data catalog with automatic validation on load and save operations.
 20
 21    ValidatedDataCatalog is the main user-facing API for the adc-toolkit data
 22    handling system. It wraps a DataCatalog (responsible for I/O operations)
 23    and a DataValidator (responsible for data quality checks) to provide a
 24    unified interface that automatically validates data after loading and
 25    before saving.
 26
 27    This design ensures data quality is enforced at catalog boundaries,
 28    catching issues early in data pipelines. Validation is transparent to
 29    the user: simply call load() and save() as you would with a regular
 30    catalog, and validation happens automatically.
 31
 32    The class uses dependency injection to allow flexible catalog and
 33    validator implementations. By default, it uses KedroDataCatalog for
 34    I/O and GXValidator (Great Expectations) for validation, but these
 35    can be swapped for custom implementations or alternative validators
 36    like PanderaValidator.
 37
 38    Parameters
 39    ----------
 40    catalog : DataCatalog
 41        The data catalog instance responsible for loading and saving datasets.
 42        Must implement the DataCatalog protocol with load() and save() methods.
 43        The default implementation is KedroDataCatalog, which uses Kedro's
 44        configuration-driven catalog system with YAML-based dataset definitions.
 45    validator : DataValidator
 46        The data validator instance responsible for validating datasets.
 47        Must implement the DataValidator protocol with a validate() method.
 48        The default implementation is GXValidator (Great Expectations), with
 49        PanderaValidator as a fallback. Use NoValidator to disable validation.
 50
 51    Attributes
 52    ----------
 53    catalog : DataCatalog
 54        The underlying data catalog for I/O operations. Immutable after
 55        instantiation.
 56    validator : DataValidator
 57        The underlying data validator for quality checks. Immutable after
 58        instantiation.
 59
 60    See Also
 61    --------
 62    DataCatalog : Protocol defining the catalog interface.
 63    DataValidator : Protocol defining the validator interface.
 64    adc_toolkit.data.catalogs.kedro.KedroDataCatalog : Default catalog implementation.
 65    adc_toolkit.data.validators.gx.GXValidator : Default validator (Great Expectations).
 66    adc_toolkit.data.validators.pandera.PanderaValidator : Alternative validator.
 67    adc_toolkit.data.validators.no_validator.NoValidator : No-op validator.
 68
 69    Notes
 70    -----
 71    The class uses `__slots__` to restrict attributes to only 'catalog' and
 72    'validator', preventing accidental attribute additions and reducing memory
 73    overhead.
 74
 75    Attributes are immutable after instantiation. To use a different catalog
 76    or validator, create a new ValidatedDataCatalog instance.
 77
 78    Validation happens in the following order:
 79    - On load: catalog.load() -> validator.validate() -> return validated data
 80    - On save: validator.validate() -> catalog.save() -> return None
 81
 82    This means invalid data will never be saved, and loaded data is always
 83    validated before being returned to the caller.
 84
 85    The factory method in_directory() is the recommended way to instantiate
 86    this class for most use cases. Direct instantiation via __init__() is
 87    primarily useful for testing or custom configurations.
 88
 89    Examples
 90    --------
 91    Basic usage with default catalog and validator:
 92
 93    >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
 94    >>> catalog = ValidatedDataCatalog.in_directory("config/data")
 95    >>> df = catalog.load("customer_data")
 96    >>> # Data is automatically validated after loading
 97    >>> processed_df = process_data(df)
 98    >>> catalog.save("processed_customer_data", processed_df)
 99    >>> # Data is automatically validated before saving
100
101    Using a custom validator while keeping the default catalog:
102
103    >>> from adc_toolkit.data.validators.pandera import PanderaValidator
104    >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
105    >>> df = catalog.load("sales_data")
106
107    Using custom catalog and validator implementations:
108
109    >>> from myproject.catalogs import CustomCatalog
110    >>> from myproject.validators import CustomValidator
111    >>> catalog = ValidatedDataCatalog.in_directory(
112    ...     "config/data", catalog_class=CustomCatalog, validator_class=CustomValidator
113    ... )
114
115    Direct instantiation for testing or advanced use cases:
116
117    >>> from unittest.mock import Mock
118    >>> mock_catalog = Mock(spec=DataCatalog)
119    >>> mock_validator = Mock(spec=DataValidator)
120    >>> catalog = ValidatedDataCatalog(catalog=mock_catalog, validator=mock_validator)
121
122    Complete data pipeline example:
123
124    >>> import pandas as pd
125    >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
126    >>>
127    >>> # Initialize catalog with validation
128    >>> catalog = ValidatedDataCatalog.in_directory("config/production")
129    >>>
130    >>> # Load and automatically validate raw data
131    >>> raw_data = catalog.load("raw_sales")
132    >>>
133    >>> # Process data
134    >>> cleaned = raw_data.dropna()
135    >>> aggregated = cleaned.groupby("region").sum()
136    >>>
137    >>> # Save and automatically validate before writing
138    >>> catalog.save("aggregated_sales", aggregated)
139    >>>
140    >>> # Validation errors are raised if data doesn't meet expectations
141    >>> try:
142    ...     invalid_df = pd.DataFrame({"bad_column": [1, 2, 3]})
143    ...     catalog.save("aggregated_sales", invalid_df)
144    ... except Exception as e:
145    ...     print(f"Validation failed: {e}")
146
147    Disabling validation for trusted data sources:
148
149    >>> from adc_toolkit.data.validators.no_validator import NoValidator
150    >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
151    >>> # No validation overhead, useful for performance-critical paths
152    >>> df = catalog.load("trusted_source")
153    """
154
155    __slots__ = ("catalog", "validator")
156
157    def __init__(self, catalog: DataCatalog, validator: DataValidator) -> None:
158        """
159        Initialize a ValidatedDataCatalog with a catalog and validator.
160
161        This constructor is primarily used for testing or advanced use cases
162        where you have already instantiated catalog and validator objects.
163        For most use cases, prefer the in_directory() factory method.
164
165        Parameters
166        ----------
167        catalog : DataCatalog
168            The data catalog instance for I/O operations. Must implement the
169            DataCatalog protocol with load(name) and save(name, data) methods.
170        validator : DataValidator
171            The data validator instance for quality checks. Must implement the
172            DataValidator protocol with a validate(name, data) method.
173
174        See Also
175        --------
176        in_directory : Factory method for creating instances from configuration.
177        load : Load and validate a dataset.
178        save : Validate and save a dataset.
179
180        Notes
181        -----
182        Both catalog and validator are stored as instance attributes and
183        become immutable after initialization due to `__slots__`.
184
185        The constructor performs no validation of the provided objects beyond
186        type annotations. It's the caller's responsibility to ensure the
187        objects implement the required protocols correctly.
188
189        Examples
190        --------
191        Direct instantiation with concrete implementations:
192
193        >>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
194        >>> from adc_toolkit.data.validators.gx import GXValidator
195        >>>
196        >>> catalog_impl = KedroDataCatalog("config/data")
197        >>> validator_impl = GXValidator.in_directory("config/validations")
198        >>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl)
199
200        Using mock objects for testing:
201
202        >>> from unittest.mock import Mock
203        >>> import pandas as pd
204        >>>
205        >>> mock_catalog = Mock(spec=DataCatalog)
206        >>> mock_validator = Mock(spec=DataValidator)
207        >>>
208        >>> # Setup mock behavior
209        >>> test_df = pd.DataFrame({"a": [1, 2, 3]})
210        >>> mock_catalog.load.return_value = test_df
211        >>> mock_validator.validate.return_value = test_df
212        >>>
213        >>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator)
214        >>> result = catalog.load("test_dataset")
215        >>> mock_catalog.load.assert_called_once_with("test_dataset")
216        """
217        self.catalog = catalog
218        self.validator = validator
219
220    @classmethod
221    def in_directory(
222        cls,
223        path: str | Path,
224        catalog_class: type[DataCatalog] | None = None,
225        validator_class: type[DataValidator] | None = None,
226    ) -> "ValidatedDataCatalog":
227        """
228        Create a validated catalog from configuration in a directory.
229
230        This is the recommended factory method for creating ValidatedDataCatalog
231        instances in production code. It reads configuration files from the
232        specified directory and instantiates both the catalog and validator
233        using their respective in_directory() factory methods.
234
235        By default, this method uses KedroDataCatalog for I/O operations and
236        GXValidator (Great Expectations) for validation. If Great Expectations
237        is not installed, it falls back to PanderaValidator. Custom
238        implementations can be provided via the catalog_class and
239        validator_class parameters.
240
241        Parameters
242        ----------
243        path : str or pathlib.Path
244            Path to the directory containing catalog and validator configuration
245            files. This directory typically contains:
246            - catalog.yml: Kedro catalog configuration (for KedroDataCatalog)
247            - expectations/: Great Expectations suite definitions (for GXValidator)
248            - Or equivalent configuration for custom implementations.
249            The path can be absolute or relative to the current working directory.
250        catalog_class : type[DataCatalog] or None, optional
251            Custom data catalog class to use instead of the default. Must be a
252            class (not instance) that implements the DataCatalog protocol and
253            provides an in_directory(path) class method. If None (default),
254            uses KedroDataCatalog.
255        validator_class : type[DataValidator] or None, optional
256            Custom data validator class to use instead of the default. Must be
257            a class (not instance) that implements the DataValidator protocol
258            and provides an in_directory(path) class method. If None (default),
259            uses GXValidator if available, otherwise PanderaValidator.
260
261        Returns
262        -------
263        ValidatedDataCatalog
264            A new ValidatedDataCatalog instance with catalog and validator
265            initialized from the configuration directory.
266
267        Raises
268        ------
269        FileNotFoundError
270            If the specified directory does not exist or required configuration
271            files are missing.
272        ImportError
273            If default catalog/validator classes are requested but their
274            required packages are not installed (e.g., kedro, great_expectations,
275            or pandera).
276        ValueError
277            If configuration files are malformed or contain invalid settings.
278
279        See Also
280        --------
281        __init__ : Direct constructor for advanced use cases.
282        load : Load and validate a dataset from the catalog.
283        save : Validate and save a dataset to the catalog.
284        adc_toolkit.data.default_attributes.default_catalog : Function that creates default catalog.
285        adc_toolkit.data.default_attributes.default_validator : Function that creates default validator.
286
287        Notes
288        -----
289        The method calls in_directory() on the catalog and validator classes,
290        which are responsible for reading their respective configuration files.
291        The exact configuration file format depends on the implementations used.
292
293        For KedroDataCatalog (default), the directory should contain:
294        - catalog.yml: Dataset definitions in Kedro format
295        - (optionally) credentials.yml: Credential configurations
296
297        For GXValidator (default), the directory should contain:
298        - great_expectations.yml: GX project configuration
299        - expectations/: Directory with expectation suite JSON files
300        - checkpoints/: Directory with checkpoint configurations
301
302        For PanderaValidator, the directory should contain:
303        - Python files defining Pandera schemas for each dataset
304
305        The directory structure can be organized as needed; implementations
306        are responsible for finding their configuration files within the path.
307
308        This method is thread-safe as long as the underlying catalog and
309        validator implementations are thread-safe.
310
311        Examples
312        --------
313        Basic usage with default catalog and validator:
314
315        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
316        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
317        >>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator)
318
319        Using pathlib.Path:
320
321        >>> from pathlib import Path
322        >>> config_dir = Path("config") / "production" / "data"
323        >>> catalog = ValidatedDataCatalog.in_directory(config_dir)
324
325        Using a custom catalog with default validator:
326
327        >>> from myproject.catalogs import S3DataCatalog
328        >>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog)
329        >>> # Uses S3DataCatalog for I/O, GXValidator for validation
330
331        Using a custom validator with default catalog:
332
333        >>> from adc_toolkit.data.validators.pandera import PanderaValidator
334        >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
335        >>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation
336
337        Using custom catalog and validator:
338
339        >>> from myproject.catalogs import DatabaseCatalog
340        >>> from myproject.validators import CustomValidator
341        >>> catalog = ValidatedDataCatalog.in_directory(
342        ...     "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator
343        ... )
344
345        Disabling validation for performance-critical scenarios:
346
347        >>> from adc_toolkit.data.validators.no_validator import NoValidator
348        >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
349        >>> # No validation overhead, useful for trusted data sources
350
351        Different configurations for different environments:
352
353        >>> # Development: use local files with strict validation
354        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
355        >>>
356        >>> # Production: use cloud storage with the same validation
357        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
358        >>>
359        >>> # Testing: use in-memory catalog with no validation
360        >>> from unittest.mock import Mock
361        >>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator)
362
363        Complete workflow example:
364
365        >>> import pandas as pd
366        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
367        >>>
368        >>> # Initialize catalog from configuration
369        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
370        >>>
371        >>> # Load raw data (validated on load)
372        >>> raw = catalog.load("raw_transactions")
373        >>>
374        >>> # Process data
375        >>> cleaned = raw.dropna()
376        >>> features = engineer_features(cleaned)
377        >>>
378        >>> # Save results (validated before save)
379        >>> catalog.save("cleaned_transactions", cleaned)
380        >>> catalog.save("feature_matrix", features)
381        """
382        catalog = catalog_class.in_directory(path) if catalog_class else default_catalog(path)
383        validator = validator_class.in_directory(path) if validator_class else default_validator(path)
384        return cls(catalog, validator)
385
386    def load(self, name: str, **kwargs: Any) -> Data:
387        """
388        Load a dataset from the catalog and validate it.
389
390        This method performs a two-step operation:
391        1. Load the dataset using the underlying catalog's load() method
392        2. Validate the loaded dataset using the validator's validate() method
393
394        The validation step ensures that the loaded data meets all configured
395        quality expectations before being returned to the caller. If validation
396        fails, an exception is raised and no data is returned.
397
398        This provides a safety guarantee: any data returned from load() has
399        been validated and can be trusted to meet the configured expectations.
400
401        Parameters
402        ----------
403        name : str
404            The registered name of the dataset to load. This name must be
405            defined in both the catalog configuration (for loading) and the
406            validator configuration (for validation rules). The name serves
407            as the lookup key for both components.
408        **kwargs : Any
409            Additional keyword arguments passed through to the underlying
410            catalog's load() method. The supported arguments depend on the
411            catalog implementation. Common examples include:
412            - version: str - Load a specific version of the dataset
413            - load_args: dict - Override default load arguments
414            - credentials: dict - Override default credentials
415            Consult your catalog implementation's documentation for details.
416
417        Returns
418        -------
419        Data
420            The loaded and validated dataset. The specific type depends on
421            the catalog configuration (e.g., pandas.DataFrame,
422            pyspark.sql.DataFrame). The returned data has passed all
423            validation checks configured for this dataset name.
424
425        Raises
426        ------
427        KeyError
428            If the dataset name is not registered in the catalog or validator
429            configuration.
430        FileNotFoundError
431            If the dataset's source file or location does not exist.
432        ValidationError
433            If the loaded data fails validation. The exception includes details
434            about which validation rules failed, expected values, and observed
435            values. The specific exception type depends on the validator
436            implementation (e.g., great_expectations.exceptions.ValidationError
437            for GXValidator, pandera.errors.SchemaError for PanderaValidator).
438        ValueError
439            If the dataset cannot be loaded due to format errors, parsing
440            failures, or incompatible data types.
441        TypeError
442            If the loaded data type is incompatible with the validator
443            expectations.
444        PermissionError
445            If the dataset source is not readable due to permission issues.
446
447        See Also
448        --------
449        save : Validate and save a dataset to the catalog.
450        in_directory : Factory method for creating catalog instances.
451        DataCatalog.load : Underlying catalog load operation.
452        DataValidator.validate : Underlying validation operation.
453
454        Notes
455        -----
456        The load operation is idempotent: calling it multiple times with the
457        same name and arguments should return equivalent data (assuming the
458        underlying source hasn't changed).
459
460        Validation happens after loading, so the full dataset must be loaded
461        into memory before validation can begin. For very large datasets, this
462        may have performance implications. Some validator implementations
463        support sampling-based validation to mitigate this.
464
465        If validation fails, the loaded data is discarded and not returned.
466        This prevents invalid data from propagating through your pipeline.
467
468        The method does not cache loaded data. Each call performs a fresh
469        load and validation. If caching is needed, implement it at a higher
470        level or use a catalog implementation that supports caching.
471
472        Thread safety depends on the underlying catalog and validator
473        implementations. Consult their documentation if concurrent loading
474        is required.
475
476        Examples
477        --------
478        Basic usage:
479
480        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
481        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
482        >>> df = catalog.load("customer_data")
483        >>> # df is guaranteed to meet all validation rules for "customer_data"
484        >>> print(df.columns)
485        Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
486
487        Loading with additional arguments:
488
489        >>> # Load a specific version
490        >>> df_v1 = catalog.load("customer_data", version="2024-01-01")
491        >>>
492        >>> # Override load arguments
493        >>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]})
494
495        Handling validation failures:
496
497        >>> try:
498        ...     df = catalog.load("strict_dataset")
499        ... except Exception as e:
500        ...     print(f"Validation failed: {e}")
501        ...     # Log the failure, send alert, or handle gracefully
502        ...     # The invalid data is not returned
503
504        Loading multiple datasets in a pipeline:
505
506        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
507        >>>
508        >>> # All datasets are validated on load
509        >>> customers = catalog.load("customers")
510        >>> orders = catalog.load("orders")
511        >>> products = catalog.load("products")
512        >>>
513        >>> # Merge validated datasets with confidence
514        >>> enriched = orders.merge(customers, on="customer_id")
515        >>> enriched = enriched.merge(products, on="product_id")
516
517        Using in a data processing function:
518
519        >>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame:
520        ...     # Load and validate raw sales data
521        ...     sales = catalog.load("raw_sales")
522        ...
523        ...     # Process with confidence that data meets expectations
524        ...     sales["revenue"] = sales["quantity"] * sales["price"]
525        ...     sales = sales.groupby("region").agg({"revenue": "sum"})
526        ...
527        ...     return sales
528
529        Comparing data across environments:
530
531        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
532        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
533        >>>
534        >>> # Same dataset name, different sources, same validation
535        >>> dev_data = dev_catalog.load("training_data")
536        >>> prod_data = prod_catalog.load("training_data")
537        >>>
538        >>> # Both are guaranteed to have the same schema and quality
539        """
540        return self.validator.validate(name, self.catalog.load(name, **kwargs))
541
542    def save(self, name: str, data: Data) -> None:
543        """
544        Validate a dataset and save it to the catalog.
545
546        This method performs a two-step operation:
547        1. Validate the dataset using the validator's validate() method
548        2. Save the validated dataset using the catalog's save() method
549
550        The validation step ensures that only data meeting all configured
551        quality expectations is persisted. If validation fails, an exception
552        is raised and no data is saved.
553
554        This provides a safety guarantee: any data saved via this method has
555        been validated and meets the configured expectations. Invalid data
556        is prevented from entering downstream systems or storage.
557
558        Parameters
559        ----------
560        name : str
561            The registered name of the dataset to save. This name must be
562            defined in both the catalog configuration (for saving location
563            and format) and the validator configuration (for validation rules).
564            The name serves as the lookup key for both components.
565        data : Data
566            The dataset to validate and save. Must be a Data protocol-compatible
567            object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has
568            'columns' and 'dtypes' properties. The data must satisfy all
569            validation rules configured for this dataset name.
570
571        Returns
572        -------
573        None
574            This method does not return a value. It performs a side effect
575            (saving data) after successful validation.
576
577        Raises
578        ------
579        KeyError
580            If the dataset name is not registered in the catalog or validator
581            configuration.
582        ValidationError
583            If the data fails validation. The exception includes details about
584            which validation rules failed, expected values, and observed values.
585            No data is saved when validation fails. The specific exception type
586            depends on the validator implementation (e.g.,
587            great_expectations.exceptions.ValidationError for GXValidator,
588            pandera.errors.SchemaError for PanderaValidator).
589        TypeError
590            If the data type is incompatible with the catalog's save operation
591            or the validator's expectations.
592        ValueError
593            If the data cannot be saved due to format errors or serialization
594            failures.
595        PermissionError
596            If the target save location is not writable.
597        OSError
598            If there are filesystem errors during the save operation (e.g.,
599            disk full, path too long).
600
601        See Also
602        --------
603        load : Load and validate a dataset from the catalog.
604        in_directory : Factory method for creating catalog instances.
605        DataCatalog.save : Underlying catalog save operation.
606        DataValidator.validate : Underlying validation operation.
607
608        Notes
609        -----
610        Validation happens before saving, so invalid data is never persisted.
611        This is crucial for maintaining data quality in downstream systems.
612
613        The save operation should be atomic when possible: either the entire
614        dataset is saved successfully, or no partial data is written. Atomicity
615        depends on the catalog implementation and underlying storage system.
616
617        Some catalog implementations support versioning, automatically creating
618        timestamped or numbered versions of saved datasets. Consult your catalog
619        documentation for details.
620
621        If the target file or location already exists, the behavior depends on
622        the catalog configuration. Common options include:
623        - Overwrite: Replace existing data (default for most catalogs)
624        - Append: Add to existing data
625        - Error: Raise an exception if target exists
626        - Version: Create a new version without overwriting
627
628        For very large datasets, validation may have performance implications
629        as the entire dataset must be validated before saving begins. Some
630        validator implementations support sampling-based validation.
631
632        Thread safety depends on the underlying catalog and validator
633        implementations. Consult their documentation if concurrent saving
634        is required.
635
636        The method does not modify the input data object. Validation may
637        internally create temporary copies or views, but the original data
638        parameter is unchanged.
639
640        Examples
641        --------
642        Basic usage:
643
644        >>> import pandas as pd
645        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
646        >>>
647        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
648        >>> df = pd.DataFrame(
649        ...     {
650        ...         "customer_id": [1, 2, 3],
651        ...         "name": ["Alice", "Bob", "Carol"],
652        ...         "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"],
653        ...     }
654        ... )
655        >>> catalog.save("customer_data", df)
656        >>> # Data is validated before saving; only valid data is persisted
657
658        Handling validation failures:
659
660        >>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]})
661        >>> try:
662        ...     catalog.save("customer_data", invalid_df)
663        ... except Exception as e:
664        ...     print(f"Validation failed: {e}")
665        ...     # Invalid data is not saved; downstream systems protected
666
667        Saving multiple datasets in a pipeline:
668
669        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
670        >>>
671        >>> # Process and save intermediate results
672        >>> raw = catalog.load("raw_transactions")
673        >>> cleaned = raw.dropna()
674        >>> catalog.save("cleaned_transactions", cleaned)
675        >>>
676        >>> # Further processing
677        >>> features = engineer_features(cleaned)
678        >>> catalog.save("feature_matrix", features)
679        >>>
680        >>> # Final output
681        >>> predictions = model.predict(features)
682        >>> catalog.save("predictions", predictions)
683
684        Complete validation workflow:
685
686        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
687        >>>
688        >>> catalog = ValidatedDataCatalog.in_directory("config/production")
689        >>>
690        >>> # Load validated input
691        >>> input_data = catalog.load("input_dataset")
692        >>>
693        >>> # Transform data
694        >>> output_data = transform(input_data)
695        >>>
696        >>> # Save with validation
697        >>> try:
698        ...     catalog.save("output_dataset", output_data)
699        ... except Exception as e:
700        ...     # Validation failed; investigate and fix transformation
701        ...     logger.error(f"Output validation failed: {e}")
702        ...     # Original input_data is still available for debugging
703        ...     raise
704
705        Using save in a reusable processing function:
706
707        >>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None:
708        ...     # Load validated data
709        ...     sales = catalog.load(input_name)
710        ...
711        ...     # Aggregate
712        ...     aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"})
713        ...
714        ...     # Save validated output
715        ...     catalog.save(output_name, aggregated)
716        >>>
717        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
718        >>> aggregate_sales(catalog, "daily_sales", "monthly_sales")
719
720        Preventing invalid data from reaching production:
721
722        >>> # Development environment - experimenting with new features
723        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
724        >>> experimental_df = create_new_features(raw_data)
725        >>>
726        >>> try:
727        ...     dev_catalog.save("feature_matrix", experimental_df)
728        ... except Exception as e:
729        ...     print(f"New features don't meet schema: {e}")
730        ...     # Fix the feature engineering before deploying to production
731        >>>
732        >>> # Production environment - same validation rules enforced
733        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
734        >>> prod_catalog.save("feature_matrix", experimental_df)
735        >>> # Only succeeds if data meets production quality standards
736
737        Saving with different formats via catalog configuration:
738
739        >>> # Catalog configuration determines format (CSV, Parquet, etc.)
740        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
741        >>>
742        >>> # Same save() call, different formats based on catalog config
743        >>> catalog.save("csv_output", df)  # Saved as CSV
744        >>> catalog.save("parquet_output", df)  # Saved as Parquet
745        >>> catalog.save("database_table", df)  # Saved to database
746        >>> # All validated before saving regardless of format
747        """
748        self.catalog.save(name, self.validator.validate(name, data))
class ValidatedDataCatalog:
 18class ValidatedDataCatalog:
 19    """
 20    Data catalog with automatic validation on load and save operations.
 21
 22    ValidatedDataCatalog is the main user-facing API for the adc-toolkit data
 23    handling system. It wraps a DataCatalog (responsible for I/O operations)
 24    and a DataValidator (responsible for data quality checks) to provide a
 25    unified interface that automatically validates data after loading and
 26    before saving.
 27
 28    This design ensures data quality is enforced at catalog boundaries,
 29    catching issues early in data pipelines. Validation is transparent to
 30    the user: simply call load() and save() as you would with a regular
 31    catalog, and validation happens automatically.
 32
 33    The class uses dependency injection to allow flexible catalog and
 34    validator implementations. By default, it uses KedroDataCatalog for
 35    I/O and GXValidator (Great Expectations) for validation, but these
 36    can be swapped for custom implementations or alternative validators
 37    like PanderaValidator.
 38
 39    Parameters
 40    ----------
 41    catalog : DataCatalog
 42        The data catalog instance responsible for loading and saving datasets.
 43        Must implement the DataCatalog protocol with load() and save() methods.
 44        The default implementation is KedroDataCatalog, which uses Kedro's
 45        configuration-driven catalog system with YAML-based dataset definitions.
 46    validator : DataValidator
 47        The data validator instance responsible for validating datasets.
 48        Must implement the DataValidator protocol with a validate() method.
 49        The default implementation is GXValidator (Great Expectations), with
 50        PanderaValidator as a fallback. Use NoValidator to disable validation.
 51
 52    Attributes
 53    ----------
 54    catalog : DataCatalog
 55        The underlying data catalog for I/O operations. Immutable after
 56        instantiation.
 57    validator : DataValidator
 58        The underlying data validator for quality checks. Immutable after
 59        instantiation.
 60
 61    See Also
 62    --------
 63    DataCatalog : Protocol defining the catalog interface.
 64    DataValidator : Protocol defining the validator interface.
 65    adc_toolkit.data.catalogs.kedro.KedroDataCatalog : Default catalog implementation.
 66    adc_toolkit.data.validators.gx.GXValidator : Default validator (Great Expectations).
 67    adc_toolkit.data.validators.pandera.PanderaValidator : Alternative validator.
 68    adc_toolkit.data.validators.no_validator.NoValidator : No-op validator.
 69
 70    Notes
 71    -----
 72    The class uses `__slots__` to restrict attributes to only 'catalog' and
 73    'validator', preventing accidental attribute additions and reducing memory
 74    overhead.
 75
 76    Attributes are immutable after instantiation. To use a different catalog
 77    or validator, create a new ValidatedDataCatalog instance.
 78
 79    Validation happens in the following order:
 80    - On load: catalog.load() -> validator.validate() -> return validated data
 81    - On save: validator.validate() -> catalog.save() -> return None
 82
 83    This means invalid data will never be saved, and loaded data is always
 84    validated before being returned to the caller.
 85
 86    The factory method in_directory() is the recommended way to instantiate
 87    this class for most use cases. Direct instantiation via __init__() is
 88    primarily useful for testing or custom configurations.
 89
 90    Examples
 91    --------
 92    Basic usage with default catalog and validator:
 93
 94    >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
 95    >>> catalog = ValidatedDataCatalog.in_directory("config/data")
 96    >>> df = catalog.load("customer_data")
 97    >>> # Data is automatically validated after loading
 98    >>> processed_df = process_data(df)
 99    >>> catalog.save("processed_customer_data", processed_df)
100    >>> # Data is automatically validated before saving
101
102    Using a custom validator while keeping the default catalog:
103
104    >>> from adc_toolkit.data.validators.pandera import PanderaValidator
105    >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
106    >>> df = catalog.load("sales_data")
107
108    Using custom catalog and validator implementations:
109
110    >>> from myproject.catalogs import CustomCatalog
111    >>> from myproject.validators import CustomValidator
112    >>> catalog = ValidatedDataCatalog.in_directory(
113    ...     "config/data", catalog_class=CustomCatalog, validator_class=CustomValidator
114    ... )
115
116    Direct instantiation for testing or advanced use cases:
117
118    >>> from unittest.mock import Mock
119    >>> mock_catalog = Mock(spec=DataCatalog)
120    >>> mock_validator = Mock(spec=DataValidator)
121    >>> catalog = ValidatedDataCatalog(catalog=mock_catalog, validator=mock_validator)
122
123    Complete data pipeline example:
124
125    >>> import pandas as pd
126    >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
127    >>>
128    >>> # Initialize catalog with validation
129    >>> catalog = ValidatedDataCatalog.in_directory("config/production")
130    >>>
131    >>> # Load and automatically validate raw data
132    >>> raw_data = catalog.load("raw_sales")
133    >>>
134    >>> # Process data
135    >>> cleaned = raw_data.dropna()
136    >>> aggregated = cleaned.groupby("region").sum()
137    >>>
138    >>> # Save and automatically validate before writing
139    >>> catalog.save("aggregated_sales", aggregated)
140    >>>
141    >>> # Validation errors are raised if data doesn't meet expectations
142    >>> try:
143    ...     invalid_df = pd.DataFrame({"bad_column": [1, 2, 3]})
144    ...     catalog.save("aggregated_sales", invalid_df)
145    ... except Exception as e:
146    ...     print(f"Validation failed: {e}")
147
148    Disabling validation for trusted data sources:
149
150    >>> from adc_toolkit.data.validators.no_validator import NoValidator
151    >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
152    >>> # No validation overhead, useful for performance-critical paths
153    >>> df = catalog.load("trusted_source")
154    """
155
156    __slots__ = ("catalog", "validator")
157
158    def __init__(self, catalog: DataCatalog, validator: DataValidator) -> None:
159        """
160        Initialize a ValidatedDataCatalog with a catalog and validator.
161
162        This constructor is primarily used for testing or advanced use cases
163        where you have already instantiated catalog and validator objects.
164        For most use cases, prefer the in_directory() factory method.
165
166        Parameters
167        ----------
168        catalog : DataCatalog
169            The data catalog instance for I/O operations. Must implement the
170            DataCatalog protocol with load(name) and save(name, data) methods.
171        validator : DataValidator
172            The data validator instance for quality checks. Must implement the
173            DataValidator protocol with a validate(name, data) method.
174
175        See Also
176        --------
177        in_directory : Factory method for creating instances from configuration.
178        load : Load and validate a dataset.
179        save : Validate and save a dataset.
180
181        Notes
182        -----
183        Both catalog and validator are stored as instance attributes and
184        become immutable after initialization due to `__slots__`.
185
186        The constructor performs no validation of the provided objects beyond
187        type annotations. It's the caller's responsibility to ensure the
188        objects implement the required protocols correctly.
189
190        Examples
191        --------
192        Direct instantiation with concrete implementations:
193
194        >>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
195        >>> from adc_toolkit.data.validators.gx import GXValidator
196        >>>
197        >>> catalog_impl = KedroDataCatalog("config/data")
198        >>> validator_impl = GXValidator.in_directory("config/validations")
199        >>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl)
200
201        Using mock objects for testing:
202
203        >>> from unittest.mock import Mock
204        >>> import pandas as pd
205        >>>
206        >>> mock_catalog = Mock(spec=DataCatalog)
207        >>> mock_validator = Mock(spec=DataValidator)
208        >>>
209        >>> # Setup mock behavior
210        >>> test_df = pd.DataFrame({"a": [1, 2, 3]})
211        >>> mock_catalog.load.return_value = test_df
212        >>> mock_validator.validate.return_value = test_df
213        >>>
214        >>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator)
215        >>> result = catalog.load("test_dataset")
216        >>> mock_catalog.load.assert_called_once_with("test_dataset")
217        """
218        self.catalog = catalog
219        self.validator = validator
220
221    @classmethod
222    def in_directory(
223        cls,
224        path: str | Path,
225        catalog_class: type[DataCatalog] | None = None,
226        validator_class: type[DataValidator] | None = None,
227    ) -> "ValidatedDataCatalog":
228        """
229        Create a validated catalog from configuration in a directory.
230
231        This is the recommended factory method for creating ValidatedDataCatalog
232        instances in production code. It reads configuration files from the
233        specified directory and instantiates both the catalog and validator
234        using their respective in_directory() factory methods.
235
236        By default, this method uses KedroDataCatalog for I/O operations and
237        GXValidator (Great Expectations) for validation. If Great Expectations
238        is not installed, it falls back to PanderaValidator. Custom
239        implementations can be provided via the catalog_class and
240        validator_class parameters.
241
242        Parameters
243        ----------
244        path : str or pathlib.Path
245            Path to the directory containing catalog and validator configuration
246            files. This directory typically contains:
247            - catalog.yml: Kedro catalog configuration (for KedroDataCatalog)
248            - expectations/: Great Expectations suite definitions (for GXValidator)
249            - Or equivalent configuration for custom implementations.
250            The path can be absolute or relative to the current working directory.
251        catalog_class : type[DataCatalog] or None, optional
252            Custom data catalog class to use instead of the default. Must be a
253            class (not instance) that implements the DataCatalog protocol and
254            provides an in_directory(path) class method. If None (default),
255            uses KedroDataCatalog.
256        validator_class : type[DataValidator] or None, optional
257            Custom data validator class to use instead of the default. Must be
258            a class (not instance) that implements the DataValidator protocol
259            and provides an in_directory(path) class method. If None (default),
260            uses GXValidator if available, otherwise PanderaValidator.
261
262        Returns
263        -------
264        ValidatedDataCatalog
265            A new ValidatedDataCatalog instance with catalog and validator
266            initialized from the configuration directory.
267
268        Raises
269        ------
270        FileNotFoundError
271            If the specified directory does not exist or required configuration
272            files are missing.
273        ImportError
274            If default catalog/validator classes are requested but their
275            required packages are not installed (e.g., kedro, great_expectations,
276            or pandera).
277        ValueError
278            If configuration files are malformed or contain invalid settings.
279
280        See Also
281        --------
282        __init__ : Direct constructor for advanced use cases.
283        load : Load and validate a dataset from the catalog.
284        save : Validate and save a dataset to the catalog.
285        adc_toolkit.data.default_attributes.default_catalog : Function that creates default catalog.
286        adc_toolkit.data.default_attributes.default_validator : Function that creates default validator.
287
288        Notes
289        -----
290        The method calls in_directory() on the catalog and validator classes,
291        which are responsible for reading their respective configuration files.
292        The exact configuration file format depends on the implementations used.
293
294        For KedroDataCatalog (default), the directory should contain:
295        - catalog.yml: Dataset definitions in Kedro format
296        - (optionally) credentials.yml: Credential configurations
297
298        For GXValidator (default), the directory should contain:
299        - great_expectations.yml: GX project configuration
300        - expectations/: Directory with expectation suite JSON files
301        - checkpoints/: Directory with checkpoint configurations
302
303        For PanderaValidator, the directory should contain:
304        - Python files defining Pandera schemas for each dataset
305
306        The directory structure can be organized as needed; implementations
307        are responsible for finding their configuration files within the path.
308
309        This method is thread-safe as long as the underlying catalog and
310        validator implementations are thread-safe.
311
312        Examples
313        --------
314        Basic usage with default catalog and validator:
315
316        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
317        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
318        >>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator)
319
320        Using pathlib.Path:
321
322        >>> from pathlib import Path
323        >>> config_dir = Path("config") / "production" / "data"
324        >>> catalog = ValidatedDataCatalog.in_directory(config_dir)
325
326        Using a custom catalog with default validator:
327
328        >>> from myproject.catalogs import S3DataCatalog
329        >>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog)
330        >>> # Uses S3DataCatalog for I/O, GXValidator for validation
331
332        Using a custom validator with default catalog:
333
334        >>> from adc_toolkit.data.validators.pandera import PanderaValidator
335        >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
336        >>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation
337
338        Using custom catalog and validator:
339
340        >>> from myproject.catalogs import DatabaseCatalog
341        >>> from myproject.validators import CustomValidator
342        >>> catalog = ValidatedDataCatalog.in_directory(
343        ...     "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator
344        ... )
345
346        Disabling validation for performance-critical scenarios:
347
348        >>> from adc_toolkit.data.validators.no_validator import NoValidator
349        >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
350        >>> # No validation overhead, useful for trusted data sources
351
352        Different configurations for different environments:
353
354        >>> # Development: use local files with strict validation
355        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
356        >>>
357        >>> # Production: use cloud storage with the same validation
358        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
359        >>>
360        >>> # Testing: use in-memory catalog with no validation
361        >>> from unittest.mock import Mock
362        >>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator)
363
364        Complete workflow example:
365
366        >>> import pandas as pd
367        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
368        >>>
369        >>> # Initialize catalog from configuration
370        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
371        >>>
372        >>> # Load raw data (validated on load)
373        >>> raw = catalog.load("raw_transactions")
374        >>>
375        >>> # Process data
376        >>> cleaned = raw.dropna()
377        >>> features = engineer_features(cleaned)
378        >>>
379        >>> # Save results (validated before save)
380        >>> catalog.save("cleaned_transactions", cleaned)
381        >>> catalog.save("feature_matrix", features)
382        """
383        catalog = catalog_class.in_directory(path) if catalog_class else default_catalog(path)
384        validator = validator_class.in_directory(path) if validator_class else default_validator(path)
385        return cls(catalog, validator)
386
387    def load(self, name: str, **kwargs: Any) -> Data:
388        """
389        Load a dataset from the catalog and validate it.
390
391        This method performs a two-step operation:
392        1. Load the dataset using the underlying catalog's load() method
393        2. Validate the loaded dataset using the validator's validate() method
394
395        The validation step ensures that the loaded data meets all configured
396        quality expectations before being returned to the caller. If validation
397        fails, an exception is raised and no data is returned.
398
399        This provides a safety guarantee: any data returned from load() has
400        been validated and can be trusted to meet the configured expectations.
401
402        Parameters
403        ----------
404        name : str
405            The registered name of the dataset to load. This name must be
406            defined in both the catalog configuration (for loading) and the
407            validator configuration (for validation rules). The name serves
408            as the lookup key for both components.
409        **kwargs : Any
410            Additional keyword arguments passed through to the underlying
411            catalog's load() method. The supported arguments depend on the
412            catalog implementation. Common examples include:
413            - version: str - Load a specific version of the dataset
414            - load_args: dict - Override default load arguments
415            - credentials: dict - Override default credentials
416            Consult your catalog implementation's documentation for details.
417
418        Returns
419        -------
420        Data
421            The loaded and validated dataset. The specific type depends on
422            the catalog configuration (e.g., pandas.DataFrame,
423            pyspark.sql.DataFrame). The returned data has passed all
424            validation checks configured for this dataset name.
425
426        Raises
427        ------
428        KeyError
429            If the dataset name is not registered in the catalog or validator
430            configuration.
431        FileNotFoundError
432            If the dataset's source file or location does not exist.
433        ValidationError
434            If the loaded data fails validation. The exception includes details
435            about which validation rules failed, expected values, and observed
436            values. The specific exception type depends on the validator
437            implementation (e.g., great_expectations.exceptions.ValidationError
438            for GXValidator, pandera.errors.SchemaError for PanderaValidator).
439        ValueError
440            If the dataset cannot be loaded due to format errors, parsing
441            failures, or incompatible data types.
442        TypeError
443            If the loaded data type is incompatible with the validator
444            expectations.
445        PermissionError
446            If the dataset source is not readable due to permission issues.
447
448        See Also
449        --------
450        save : Validate and save a dataset to the catalog.
451        in_directory : Factory method for creating catalog instances.
452        DataCatalog.load : Underlying catalog load operation.
453        DataValidator.validate : Underlying validation operation.
454
455        Notes
456        -----
457        The load operation is idempotent: calling it multiple times with the
458        same name and arguments should return equivalent data (assuming the
459        underlying source hasn't changed).
460
461        Validation happens after loading, so the full dataset must be loaded
462        into memory before validation can begin. For very large datasets, this
463        may have performance implications. Some validator implementations
464        support sampling-based validation to mitigate this.
465
466        If validation fails, the loaded data is discarded and not returned.
467        This prevents invalid data from propagating through your pipeline.
468
469        The method does not cache loaded data. Each call performs a fresh
470        load and validation. If caching is needed, implement it at a higher
471        level or use a catalog implementation that supports caching.
472
473        Thread safety depends on the underlying catalog and validator
474        implementations. Consult their documentation if concurrent loading
475        is required.
476
477        Examples
478        --------
479        Basic usage:
480
481        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
482        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
483        >>> df = catalog.load("customer_data")
484        >>> # df is guaranteed to meet all validation rules for "customer_data"
485        >>> print(df.columns)
486        Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
487
488        Loading with additional arguments:
489
490        >>> # Load a specific version
491        >>> df_v1 = catalog.load("customer_data", version="2024-01-01")
492        >>>
493        >>> # Override load arguments
494        >>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]})
495
496        Handling validation failures:
497
498        >>> try:
499        ...     df = catalog.load("strict_dataset")
500        ... except Exception as e:
501        ...     print(f"Validation failed: {e}")
502        ...     # Log the failure, send alert, or handle gracefully
503        ...     # The invalid data is not returned
504
505        Loading multiple datasets in a pipeline:
506
507        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
508        >>>
509        >>> # All datasets are validated on load
510        >>> customers = catalog.load("customers")
511        >>> orders = catalog.load("orders")
512        >>> products = catalog.load("products")
513        >>>
514        >>> # Merge validated datasets with confidence
515        >>> enriched = orders.merge(customers, on="customer_id")
516        >>> enriched = enriched.merge(products, on="product_id")
517
518        Using in a data processing function:
519
520        >>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame:
521        ...     # Load and validate raw sales data
522        ...     sales = catalog.load("raw_sales")
523        ...
524        ...     # Process with confidence that data meets expectations
525        ...     sales["revenue"] = sales["quantity"] * sales["price"]
526        ...     sales = sales.groupby("region").agg({"revenue": "sum"})
527        ...
528        ...     return sales
529
530        Comparing data across environments:
531
532        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
533        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
534        >>>
535        >>> # Same dataset name, different sources, same validation
536        >>> dev_data = dev_catalog.load("training_data")
537        >>> prod_data = prod_catalog.load("training_data")
538        >>>
539        >>> # Both are guaranteed to have the same schema and quality
540        """
541        return self.validator.validate(name, self.catalog.load(name, **kwargs))
542
543    def save(self, name: str, data: Data) -> None:
544        """
545        Validate a dataset and save it to the catalog.
546
547        This method performs a two-step operation:
548        1. Validate the dataset using the validator's validate() method
549        2. Save the validated dataset using the catalog's save() method
550
551        The validation step ensures that only data meeting all configured
552        quality expectations is persisted. If validation fails, an exception
553        is raised and no data is saved.
554
555        This provides a safety guarantee: any data saved via this method has
556        been validated and meets the configured expectations. Invalid data
557        is prevented from entering downstream systems or storage.
558
559        Parameters
560        ----------
561        name : str
562            The registered name of the dataset to save. This name must be
563            defined in both the catalog configuration (for saving location
564            and format) and the validator configuration (for validation rules).
565            The name serves as the lookup key for both components.
566        data : Data
567            The dataset to validate and save. Must be a Data protocol-compatible
568            object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has
569            'columns' and 'dtypes' properties. The data must satisfy all
570            validation rules configured for this dataset name.
571
572        Returns
573        -------
574        None
575            This method does not return a value. It performs a side effect
576            (saving data) after successful validation.
577
578        Raises
579        ------
580        KeyError
581            If the dataset name is not registered in the catalog or validator
582            configuration.
583        ValidationError
584            If the data fails validation. The exception includes details about
585            which validation rules failed, expected values, and observed values.
586            No data is saved when validation fails. The specific exception type
587            depends on the validator implementation (e.g.,
588            great_expectations.exceptions.ValidationError for GXValidator,
589            pandera.errors.SchemaError for PanderaValidator).
590        TypeError
591            If the data type is incompatible with the catalog's save operation
592            or the validator's expectations.
593        ValueError
594            If the data cannot be saved due to format errors or serialization
595            failures.
596        PermissionError
597            If the target save location is not writable.
598        OSError
599            If there are filesystem errors during the save operation (e.g.,
600            disk full, path too long).
601
602        See Also
603        --------
604        load : Load and validate a dataset from the catalog.
605        in_directory : Factory method for creating catalog instances.
606        DataCatalog.save : Underlying catalog save operation.
607        DataValidator.validate : Underlying validation operation.
608
609        Notes
610        -----
611        Validation happens before saving, so invalid data is never persisted.
612        This is crucial for maintaining data quality in downstream systems.
613
614        The save operation should be atomic when possible: either the entire
615        dataset is saved successfully, or no partial data is written. Atomicity
616        depends on the catalog implementation and underlying storage system.
617
618        Some catalog implementations support versioning, automatically creating
619        timestamped or numbered versions of saved datasets. Consult your catalog
620        documentation for details.
621
622        If the target file or location already exists, the behavior depends on
623        the catalog configuration. Common options include:
624        - Overwrite: Replace existing data (default for most catalogs)
625        - Append: Add to existing data
626        - Error: Raise an exception if target exists
627        - Version: Create a new version without overwriting
628
629        For very large datasets, validation may have performance implications
630        as the entire dataset must be validated before saving begins. Some
631        validator implementations support sampling-based validation.
632
633        Thread safety depends on the underlying catalog and validator
634        implementations. Consult their documentation if concurrent saving
635        is required.
636
637        The method does not modify the input data object. Validation may
638        internally create temporary copies or views, but the original data
639        parameter is unchanged.
640
641        Examples
642        --------
643        Basic usage:
644
645        >>> import pandas as pd
646        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
647        >>>
648        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
649        >>> df = pd.DataFrame(
650        ...     {
651        ...         "customer_id": [1, 2, 3],
652        ...         "name": ["Alice", "Bob", "Carol"],
653        ...         "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"],
654        ...     }
655        ... )
656        >>> catalog.save("customer_data", df)
657        >>> # Data is validated before saving; only valid data is persisted
658
659        Handling validation failures:
660
661        >>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]})
662        >>> try:
663        ...     catalog.save("customer_data", invalid_df)
664        ... except Exception as e:
665        ...     print(f"Validation failed: {e}")
666        ...     # Invalid data is not saved; downstream systems protected
667
668        Saving multiple datasets in a pipeline:
669
670        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
671        >>>
672        >>> # Process and save intermediate results
673        >>> raw = catalog.load("raw_transactions")
674        >>> cleaned = raw.dropna()
675        >>> catalog.save("cleaned_transactions", cleaned)
676        >>>
677        >>> # Further processing
678        >>> features = engineer_features(cleaned)
679        >>> catalog.save("feature_matrix", features)
680        >>>
681        >>> # Final output
682        >>> predictions = model.predict(features)
683        >>> catalog.save("predictions", predictions)
684
685        Complete validation workflow:
686
687        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
688        >>>
689        >>> catalog = ValidatedDataCatalog.in_directory("config/production")
690        >>>
691        >>> # Load validated input
692        >>> input_data = catalog.load("input_dataset")
693        >>>
694        >>> # Transform data
695        >>> output_data = transform(input_data)
696        >>>
697        >>> # Save with validation
698        >>> try:
699        ...     catalog.save("output_dataset", output_data)
700        ... except Exception as e:
701        ...     # Validation failed; investigate and fix transformation
702        ...     logger.error(f"Output validation failed: {e}")
703        ...     # Original input_data is still available for debugging
704        ...     raise
705
706        Using save in a reusable processing function:
707
708        >>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None:
709        ...     # Load validated data
710        ...     sales = catalog.load(input_name)
711        ...
712        ...     # Aggregate
713        ...     aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"})
714        ...
715        ...     # Save validated output
716        ...     catalog.save(output_name, aggregated)
717        >>>
718        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
719        >>> aggregate_sales(catalog, "daily_sales", "monthly_sales")
720
721        Preventing invalid data from reaching production:
722
723        >>> # Development environment - experimenting with new features
724        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
725        >>> experimental_df = create_new_features(raw_data)
726        >>>
727        >>> try:
728        ...     dev_catalog.save("feature_matrix", experimental_df)
729        ... except Exception as e:
730        ...     print(f"New features don't meet schema: {e}")
731        ...     # Fix the feature engineering before deploying to production
732        >>>
733        >>> # Production environment - same validation rules enforced
734        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
735        >>> prod_catalog.save("feature_matrix", experimental_df)
736        >>> # Only succeeds if data meets production quality standards
737
738        Saving with different formats via catalog configuration:
739
740        >>> # Catalog configuration determines format (CSV, Parquet, etc.)
741        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
742        >>>
743        >>> # Same save() call, different formats based on catalog config
744        >>> catalog.save("csv_output", df)  # Saved as CSV
745        >>> catalog.save("parquet_output", df)  # Saved as Parquet
746        >>> catalog.save("database_table", df)  # Saved to database
747        >>> # All validated before saving regardless of format
748        """
749        self.catalog.save(name, self.validator.validate(name, data))

Data catalog with automatic validation on load and save operations.

ValidatedDataCatalog is the main user-facing API for the adc-toolkit data handling system. It wraps a DataCatalog (responsible for I/O operations) and a DataValidator (responsible for data quality checks) to provide a unified interface that automatically validates data after loading and before saving.

This design ensures data quality is enforced at catalog boundaries, catching issues early in data pipelines. Validation is transparent to the user: simply call load() and save() as you would with a regular catalog, and validation happens automatically.

The class uses dependency injection to allow flexible catalog and validator implementations. By default, it uses KedroDataCatalog for I/O and GXValidator (Great Expectations) for validation, but these can be swapped for custom implementations or alternative validators like PanderaValidator.

Parameters
  • catalog (DataCatalog): The data catalog instance responsible for loading and saving datasets. Must implement the DataCatalog protocol with load() and save() methods. The default implementation is KedroDataCatalog, which uses Kedro's configuration-driven catalog system with YAML-based dataset definitions.
  • validator (DataValidator): The data validator instance responsible for validating datasets. Must implement the DataValidator protocol with a validate() method. The default implementation is GXValidator (Great Expectations), with PanderaValidator as a fallback. Use NoValidator to disable validation.
Attributes
  • catalog (DataCatalog): The underlying data catalog for I/O operations. Immutable after instantiation.
  • validator (DataValidator): The underlying data validator for quality checks. Immutable after instantiation.
See Also

DataCatalog: Protocol defining the catalog interface.
DataValidator: Protocol defining the validator interface.
adc_toolkit.data.catalogs.kedro.KedroDataCatalog: Default catalog implementation.
adc_toolkit.data.validators.gx.GXValidator: Default validator (Great Expectations).
adc_toolkit.data.validators.pandera.PanderaValidator: Alternative validator.
adc_toolkit.data.validators.no_validator.NoValidator: No-op validator.

Notes

The class uses __slots__ to restrict attributes to only 'catalog' and 'validator', preventing accidental attribute additions and reducing memory overhead.

Attributes are immutable after instantiation. To use a different catalog or validator, create a new ValidatedDataCatalog instance.

Validation happens in the following order:

  • On load: catalog.load() -> validator.validate() -> return validated data
  • On save: validator.validate() -> catalog.save() -> return None

This means invalid data will never be saved, and loaded data is always validated before being returned to the caller.

The factory method in_directory() is the recommended way to instantiate this class for most use cases. Direct instantiation via __init__() is primarily useful for testing or custom configurations.

Examples

Basic usage with default catalog and validator:

>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> df = catalog.load("customer_data")
>>> # Data is automatically validated after loading
>>> processed_df = process_data(df)
>>> catalog.save("processed_customer_data", processed_df)
>>> # Data is automatically validated before saving

Using a custom validator while keeping the default catalog:

>>> from adc_toolkit.data.validators.pandera import PanderaValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
>>> df = catalog.load("sales_data")

Using custom catalog and validator implementations:

>>> from myproject.catalogs import CustomCatalog
>>> from myproject.validators import CustomValidator
>>> catalog = ValidatedDataCatalog.in_directory(
...     "config/data", catalog_class=CustomCatalog, validator_class=CustomValidator
... )

Direct instantiation for testing or advanced use cases:

>>> from unittest.mock import Mock
>>> mock_catalog = Mock(spec=DataCatalog)
>>> mock_validator = Mock(spec=DataValidator)
>>> catalog = ValidatedDataCatalog(catalog=mock_catalog, validator=mock_validator)

Complete data pipeline example:

>>> import pandas as pd
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> # Initialize catalog with validation
>>> catalog = ValidatedDataCatalog.in_directory("config/production")
>>>
>>> # Load and automatically validate raw data
>>> raw_data = catalog.load("raw_sales")
>>>
>>> # Process data
>>> cleaned = raw_data.dropna()
>>> aggregated = cleaned.groupby("region").sum()
>>>
>>> # Save and automatically validate before writing
>>> catalog.save("aggregated_sales", aggregated)
>>>
>>> # Validation errors are raised if data doesn't meet expectations
>>> try:
...     invalid_df = pd.DataFrame({"bad_column": [1, 2, 3]})
...     catalog.save("aggregated_sales", invalid_df)
... except Exception as e:
...     print(f"Validation failed: {e}")

Disabling validation for trusted data sources:

>>> from adc_toolkit.data.validators.no_validator import NoValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
>>> # No validation overhead, useful for performance-critical paths
>>> df = catalog.load("trusted_source")
ValidatedDataCatalog( catalog: adc_toolkit.data.abs.DataCatalog, validator: adc_toolkit.data.abs.DataValidator)
158    def __init__(self, catalog: DataCatalog, validator: DataValidator) -> None:
159        """
160        Initialize a ValidatedDataCatalog with a catalog and validator.
161
162        This constructor is primarily used for testing or advanced use cases
163        where you have already instantiated catalog and validator objects.
164        For most use cases, prefer the in_directory() factory method.
165
166        Parameters
167        ----------
168        catalog : DataCatalog
169            The data catalog instance for I/O operations. Must implement the
170            DataCatalog protocol with load(name) and save(name, data) methods.
171        validator : DataValidator
172            The data validator instance for quality checks. Must implement the
173            DataValidator protocol with a validate(name, data) method.
174
175        See Also
176        --------
177        in_directory : Factory method for creating instances from configuration.
178        load : Load and validate a dataset.
179        save : Validate and save a dataset.
180
181        Notes
182        -----
183        Both catalog and validator are stored as instance attributes and
184        become immutable after initialization due to `__slots__`.
185
186        The constructor performs no validation of the provided objects beyond
187        type annotations. It's the caller's responsibility to ensure the
188        objects implement the required protocols correctly.
189
190        Examples
191        --------
192        Direct instantiation with concrete implementations:
193
194        >>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
195        >>> from adc_toolkit.data.validators.gx import GXValidator
196        >>>
197        >>> catalog_impl = KedroDataCatalog("config/data")
198        >>> validator_impl = GXValidator.in_directory("config/validations")
199        >>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl)
200
201        Using mock objects for testing:
202
203        >>> from unittest.mock import Mock
204        >>> import pandas as pd
205        >>>
206        >>> mock_catalog = Mock(spec=DataCatalog)
207        >>> mock_validator = Mock(spec=DataValidator)
208        >>>
209        >>> # Setup mock behavior
210        >>> test_df = pd.DataFrame({"a": [1, 2, 3]})
211        >>> mock_catalog.load.return_value = test_df
212        >>> mock_validator.validate.return_value = test_df
213        >>>
214        >>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator)
215        >>> result = catalog.load("test_dataset")
216        >>> mock_catalog.load.assert_called_once_with("test_dataset")
217        """
218        self.catalog = catalog
219        self.validator = validator

Initialize a ValidatedDataCatalog with a catalog and validator.

This constructor is primarily used for testing or advanced use cases where you have already instantiated catalog and validator objects. For most use cases, prefer the in_directory() factory method.

Parameters
  • catalog (DataCatalog): The data catalog instance for I/O operations. Must implement the DataCatalog protocol with load(name) and save(name, data) methods.
  • validator (DataValidator): The data validator instance for quality checks. Must implement the DataValidator protocol with a validate(name, data) method.
See Also

in_directory: Factory method for creating instances from configuration.
load: Load and validate a dataset.
save: Validate and save a dataset.

Notes

Both catalog and validator are stored as instance attributes and become immutable after initialization due to __slots__.

The constructor performs no validation of the provided objects beyond type annotations. It's the caller's responsibility to ensure the objects implement the required protocols correctly.

Examples

Direct instantiation with concrete implementations:

>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
>>> from adc_toolkit.data.validators.gx import GXValidator
>>>
>>> catalog_impl = KedroDataCatalog("config/data")
>>> validator_impl = GXValidator.in_directory("config/validations")
>>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl)

Using mock objects for testing:

>>> from unittest.mock import Mock
>>> import pandas as pd
>>>
>>> mock_catalog = Mock(spec=DataCatalog)
>>> mock_validator = Mock(spec=DataValidator)
>>>
>>> # Setup mock behavior
>>> test_df = pd.DataFrame({"a": [1, 2, 3]})
>>> mock_catalog.load.return_value = test_df
>>> mock_validator.validate.return_value = test_df
>>>
>>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator)
>>> result = catalog.load("test_dataset")
>>> mock_catalog.load.assert_called_once_with("test_dataset")
catalog
validator
@classmethod
def in_directory( cls, path: str | pathlib.Path, catalog_class: type[adc_toolkit.data.abs.DataCatalog] | None = None, validator_class: type[adc_toolkit.data.abs.DataValidator] | None = None) -> ValidatedDataCatalog:
221    @classmethod
222    def in_directory(
223        cls,
224        path: str | Path,
225        catalog_class: type[DataCatalog] | None = None,
226        validator_class: type[DataValidator] | None = None,
227    ) -> "ValidatedDataCatalog":
228        """
229        Create a validated catalog from configuration in a directory.
230
231        This is the recommended factory method for creating ValidatedDataCatalog
232        instances in production code. It reads configuration files from the
233        specified directory and instantiates both the catalog and validator
234        using their respective in_directory() factory methods.
235
236        By default, this method uses KedroDataCatalog for I/O operations and
237        GXValidator (Great Expectations) for validation. If Great Expectations
238        is not installed, it falls back to PanderaValidator. Custom
239        implementations can be provided via the catalog_class and
240        validator_class parameters.
241
242        Parameters
243        ----------
244        path : str or pathlib.Path
245            Path to the directory containing catalog and validator configuration
246            files. This directory typically contains:
247            - catalog.yml: Kedro catalog configuration (for KedroDataCatalog)
248            - expectations/: Great Expectations suite definitions (for GXValidator)
249            - Or equivalent configuration for custom implementations.
250            The path can be absolute or relative to the current working directory.
251        catalog_class : type[DataCatalog] or None, optional
252            Custom data catalog class to use instead of the default. Must be a
253            class (not instance) that implements the DataCatalog protocol and
254            provides an in_directory(path) class method. If None (default),
255            uses KedroDataCatalog.
256        validator_class : type[DataValidator] or None, optional
257            Custom data validator class to use instead of the default. Must be
258            a class (not instance) that implements the DataValidator protocol
259            and provides an in_directory(path) class method. If None (default),
260            uses GXValidator if available, otherwise PanderaValidator.
261
262        Returns
263        -------
264        ValidatedDataCatalog
265            A new ValidatedDataCatalog instance with catalog and validator
266            initialized from the configuration directory.
267
268        Raises
269        ------
270        FileNotFoundError
271            If the specified directory does not exist or required configuration
272            files are missing.
273        ImportError
274            If default catalog/validator classes are requested but their
275            required packages are not installed (e.g., kedro, great_expectations,
276            or pandera).
277        ValueError
278            If configuration files are malformed or contain invalid settings.
279
280        See Also
281        --------
282        __init__ : Direct constructor for advanced use cases.
283        load : Load and validate a dataset from the catalog.
284        save : Validate and save a dataset to the catalog.
285        adc_toolkit.data.default_attributes.default_catalog : Function that creates default catalog.
286        adc_toolkit.data.default_attributes.default_validator : Function that creates default validator.
287
288        Notes
289        -----
290        The method calls in_directory() on the catalog and validator classes,
291        which are responsible for reading their respective configuration files.
292        The exact configuration file format depends on the implementations used.
293
294        For KedroDataCatalog (default), the directory should contain:
295        - catalog.yml: Dataset definitions in Kedro format
296        - (optionally) credentials.yml: Credential configurations
297
298        For GXValidator (default), the directory should contain:
299        - great_expectations.yml: GX project configuration
300        - expectations/: Directory with expectation suite JSON files
301        - checkpoints/: Directory with checkpoint configurations
302
303        For PanderaValidator, the directory should contain:
304        - Python files defining Pandera schemas for each dataset
305
306        The directory structure can be organized as needed; implementations
307        are responsible for finding their configuration files within the path.
308
309        This method is thread-safe as long as the underlying catalog and
310        validator implementations are thread-safe.
311
312        Examples
313        --------
314        Basic usage with default catalog and validator:
315
316        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
317        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
318        >>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator)
319
320        Using pathlib.Path:
321
322        >>> from pathlib import Path
323        >>> config_dir = Path("config") / "production" / "data"
324        >>> catalog = ValidatedDataCatalog.in_directory(config_dir)
325
326        Using a custom catalog with default validator:
327
328        >>> from myproject.catalogs import S3DataCatalog
329        >>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog)
330        >>> # Uses S3DataCatalog for I/O, GXValidator for validation
331
332        Using a custom validator with default catalog:
333
334        >>> from adc_toolkit.data.validators.pandera import PanderaValidator
335        >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
336        >>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation
337
338        Using custom catalog and validator:
339
340        >>> from myproject.catalogs import DatabaseCatalog
341        >>> from myproject.validators import CustomValidator
342        >>> catalog = ValidatedDataCatalog.in_directory(
343        ...     "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator
344        ... )
345
346        Disabling validation for performance-critical scenarios:
347
348        >>> from adc_toolkit.data.validators.no_validator import NoValidator
349        >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
350        >>> # No validation overhead, useful for trusted data sources
351
352        Different configurations for different environments:
353
354        >>> # Development: use local files with strict validation
355        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
356        >>>
357        >>> # Production: use cloud storage with the same validation
358        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
359        >>>
360        >>> # Testing: use in-memory catalog with no validation
361        >>> from unittest.mock import Mock
362        >>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator)
363
364        Complete workflow example:
365
366        >>> import pandas as pd
367        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
368        >>>
369        >>> # Initialize catalog from configuration
370        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
371        >>>
372        >>> # Load raw data (validated on load)
373        >>> raw = catalog.load("raw_transactions")
374        >>>
375        >>> # Process data
376        >>> cleaned = raw.dropna()
377        >>> features = engineer_features(cleaned)
378        >>>
379        >>> # Save results (validated before save)
380        >>> catalog.save("cleaned_transactions", cleaned)
381        >>> catalog.save("feature_matrix", features)
382        """
383        catalog = catalog_class.in_directory(path) if catalog_class else default_catalog(path)
384        validator = validator_class.in_directory(path) if validator_class else default_validator(path)
385        return cls(catalog, validator)

Create a validated catalog from configuration in a directory.

This is the recommended factory method for creating ValidatedDataCatalog instances in production code. It reads configuration files from the specified directory and instantiates both the catalog and validator using their respective in_directory() factory methods.

By default, this method uses KedroDataCatalog for I/O operations and GXValidator (Great Expectations) for validation. If Great Expectations is not installed, it falls back to PanderaValidator. Custom implementations can be provided via the catalog_class and validator_class parameters.

Parameters
  • path (str or pathlib.Path): Path to the directory containing catalog and validator configuration files. This directory typically contains:
    • catalog.yml: Kedro catalog configuration (for KedroDataCatalog)
    • expectations/: Great Expectations suite definitions (for GXValidator)
    • Or equivalent configuration for custom implementations. The path can be absolute or relative to the current working directory.
  • catalog_class (type[DataCatalog] or None, optional): Custom data catalog class to use instead of the default. Must be a class (not instance) that implements the DataCatalog protocol and provides an in_directory(path) class method. If None (default), uses KedroDataCatalog.
  • validator_class (type[DataValidator] or None, optional): Custom data validator class to use instead of the default. Must be a class (not instance) that implements the DataValidator protocol and provides an in_directory(path) class method. If None (default), uses GXValidator if available, otherwise PanderaValidator.
Returns
  • ValidatedDataCatalog: A new ValidatedDataCatalog instance with catalog and validator initialized from the configuration directory.
Raises
  • FileNotFoundError: If the specified directory does not exist or required configuration files are missing.
  • ImportError: If default catalog/validator classes are requested but their required packages are not installed (e.g., kedro, great_expectations, or pandera).
  • ValueError: If configuration files are malformed or contain invalid settings.
See Also

__init__: Direct constructor for advanced use cases.
load: Load and validate a dataset from the catalog.
save: Validate and save a dataset to the catalog.
adc_toolkit.data.default_attributes.default_catalog: Function that creates default catalog.
adc_toolkit.data.default_attributes.default_validator: Function that creates default validator.

Notes

The method calls in_directory() on the catalog and validator classes, which are responsible for reading their respective configuration files. The exact configuration file format depends on the implementations used.

For KedroDataCatalog (default), the directory should contain:

  • catalog.yml: Dataset definitions in Kedro format
  • (optionally) credentials.yml: Credential configurations

For GXValidator (default), the directory should contain:

  • great_expectations.yml: GX project configuration
  • expectations/: Directory with expectation suite JSON files
  • checkpoints/: Directory with checkpoint configurations

For PanderaValidator, the directory should contain:

  • Python files defining Pandera schemas for each dataset

The directory structure can be organized as needed; implementations are responsible for finding their configuration files within the path.

This method is thread-safe as long as the underlying catalog and validator implementations are thread-safe.

Examples

Basic usage with default catalog and validator:

>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator)

Using pathlib.Path:

>>> from pathlib import Path
>>> config_dir = Path("config") / "production" / "data"
>>> catalog = ValidatedDataCatalog.in_directory(config_dir)

Using a custom catalog with default validator:

>>> from myproject.catalogs import S3DataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog)
>>> # Uses S3DataCatalog for I/O, GXValidator for validation

Using a custom validator with default catalog:

>>> from adc_toolkit.data.validators.pandera import PanderaValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
>>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation

Using custom catalog and validator:

>>> from myproject.catalogs import DatabaseCatalog
>>> from myproject.validators import CustomValidator
>>> catalog = ValidatedDataCatalog.in_directory(
...     "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator
... )

Disabling validation for performance-critical scenarios:

>>> from adc_toolkit.data.validators.no_validator import NoValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
>>> # No validation overhead, useful for trusted data sources

Different configurations for different environments:

>>> # Development: use local files with strict validation
>>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
>>>
>>> # Production: use cloud storage with the same validation
>>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
>>>
>>> # Testing: use in-memory catalog with no validation
>>> from unittest.mock import Mock
>>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator)

Complete workflow example:

>>> import pandas as pd
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> # Initialize catalog from configuration
>>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
>>>
>>> # Load raw data (validated on load)
>>> raw = catalog.load("raw_transactions")
>>>
>>> # Process data
>>> cleaned = raw.dropna()
>>> features = engineer_features(cleaned)
>>>
>>> # Save results (validated before save)
>>> catalog.save("cleaned_transactions", cleaned)
>>> catalog.save("feature_matrix", features)
def load(self, name: str, **kwargs: Any) -> adc_toolkit.data.abs.Data:
387    def load(self, name: str, **kwargs: Any) -> Data:
388        """
389        Load a dataset from the catalog and validate it.
390
391        This method performs a two-step operation:
392        1. Load the dataset using the underlying catalog's load() method
393        2. Validate the loaded dataset using the validator's validate() method
394
395        The validation step ensures that the loaded data meets all configured
396        quality expectations before being returned to the caller. If validation
397        fails, an exception is raised and no data is returned.
398
399        This provides a safety guarantee: any data returned from load() has
400        been validated and can be trusted to meet the configured expectations.
401
402        Parameters
403        ----------
404        name : str
405            The registered name of the dataset to load. This name must be
406            defined in both the catalog configuration (for loading) and the
407            validator configuration (for validation rules). The name serves
408            as the lookup key for both components.
409        **kwargs : Any
410            Additional keyword arguments passed through to the underlying
411            catalog's load() method. The supported arguments depend on the
412            catalog implementation. Common examples include:
413            - version: str - Load a specific version of the dataset
414            - load_args: dict - Override default load arguments
415            - credentials: dict - Override default credentials
416            Consult your catalog implementation's documentation for details.
417
418        Returns
419        -------
420        Data
421            The loaded and validated dataset. The specific type depends on
422            the catalog configuration (e.g., pandas.DataFrame,
423            pyspark.sql.DataFrame). The returned data has passed all
424            validation checks configured for this dataset name.
425
426        Raises
427        ------
428        KeyError
429            If the dataset name is not registered in the catalog or validator
430            configuration.
431        FileNotFoundError
432            If the dataset's source file or location does not exist.
433        ValidationError
434            If the loaded data fails validation. The exception includes details
435            about which validation rules failed, expected values, and observed
436            values. The specific exception type depends on the validator
437            implementation (e.g., great_expectations.exceptions.ValidationError
438            for GXValidator, pandera.errors.SchemaError for PanderaValidator).
439        ValueError
440            If the dataset cannot be loaded due to format errors, parsing
441            failures, or incompatible data types.
442        TypeError
443            If the loaded data type is incompatible with the validator
444            expectations.
445        PermissionError
446            If the dataset source is not readable due to permission issues.
447
448        See Also
449        --------
450        save : Validate and save a dataset to the catalog.
451        in_directory : Factory method for creating catalog instances.
452        DataCatalog.load : Underlying catalog load operation.
453        DataValidator.validate : Underlying validation operation.
454
455        Notes
456        -----
457        The load operation is idempotent: calling it multiple times with the
458        same name and arguments should return equivalent data (assuming the
459        underlying source hasn't changed).
460
461        Validation happens after loading, so the full dataset must be loaded
462        into memory before validation can begin. For very large datasets, this
463        may have performance implications. Some validator implementations
464        support sampling-based validation to mitigate this.
465
466        If validation fails, the loaded data is discarded and not returned.
467        This prevents invalid data from propagating through your pipeline.
468
469        The method does not cache loaded data. Each call performs a fresh
470        load and validation. If caching is needed, implement it at a higher
471        level or use a catalog implementation that supports caching.
472
473        Thread safety depends on the underlying catalog and validator
474        implementations. Consult their documentation if concurrent loading
475        is required.
476
477        Examples
478        --------
479        Basic usage:
480
481        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
482        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
483        >>> df = catalog.load("customer_data")
484        >>> # df is guaranteed to meet all validation rules for "customer_data"
485        >>> print(df.columns)
486        Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
487
488        Loading with additional arguments:
489
490        >>> # Load a specific version
491        >>> df_v1 = catalog.load("customer_data", version="2024-01-01")
492        >>>
493        >>> # Override load arguments
494        >>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]})
495
496        Handling validation failures:
497
498        >>> try:
499        ...     df = catalog.load("strict_dataset")
500        ... except Exception as e:
501        ...     print(f"Validation failed: {e}")
502        ...     # Log the failure, send alert, or handle gracefully
503        ...     # The invalid data is not returned
504
505        Loading multiple datasets in a pipeline:
506
507        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
508        >>>
509        >>> # All datasets are validated on load
510        >>> customers = catalog.load("customers")
511        >>> orders = catalog.load("orders")
512        >>> products = catalog.load("products")
513        >>>
514        >>> # Merge validated datasets with confidence
515        >>> enriched = orders.merge(customers, on="customer_id")
516        >>> enriched = enriched.merge(products, on="product_id")
517
518        Using in a data processing function:
519
520        >>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame:
521        ...     # Load and validate raw sales data
522        ...     sales = catalog.load("raw_sales")
523        ...
524        ...     # Process with confidence that data meets expectations
525        ...     sales["revenue"] = sales["quantity"] * sales["price"]
526        ...     sales = sales.groupby("region").agg({"revenue": "sum"})
527        ...
528        ...     return sales
529
530        Comparing data across environments:
531
532        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
533        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
534        >>>
535        >>> # Same dataset name, different sources, same validation
536        >>> dev_data = dev_catalog.load("training_data")
537        >>> prod_data = prod_catalog.load("training_data")
538        >>>
539        >>> # Both are guaranteed to have the same schema and quality
540        """
541        return self.validator.validate(name, self.catalog.load(name, **kwargs))

Load a dataset from the catalog and validate it.

This method performs a two-step operation:

  1. Load the dataset using the underlying catalog's load() method
  2. Validate the loaded dataset using the validator's validate() method

The validation step ensures that the loaded data meets all configured quality expectations before being returned to the caller. If validation fails, an exception is raised and no data is returned.

This provides a safety guarantee: any data returned from load() has been validated and can be trusted to meet the configured expectations.

Parameters
  • name (str): The registered name of the dataset to load. This name must be defined in both the catalog configuration (for loading) and the validator configuration (for validation rules). The name serves as the lookup key for both components.
  • **kwargs (Any): Additional keyword arguments passed through to the underlying catalog's load() method. The supported arguments depend on the catalog implementation. Common examples include:
    • version: str - Load a specific version of the dataset
    • load_args: dict - Override default load arguments
    • credentials: dict - Override default credentials Consult your catalog implementation's documentation for details.
Returns
  • Data: The loaded and validated dataset. The specific type depends on the catalog configuration (e.g., pandas.DataFrame, pyspark.sql.DataFrame). The returned data has passed all validation checks configured for this dataset name.
Raises
  • KeyError: If the dataset name is not registered in the catalog or validator configuration.
  • FileNotFoundError: If the dataset's source file or location does not exist.
  • ValidationError: If the loaded data fails validation. The exception includes details about which validation rules failed, expected values, and observed values. The specific exception type depends on the validator implementation (e.g., great_expectations.exceptions.ValidationError for GXValidator, pandera.errors.SchemaError for PanderaValidator).
  • ValueError: If the dataset cannot be loaded due to format errors, parsing failures, or incompatible data types.
  • TypeError: If the loaded data type is incompatible with the validator expectations.
  • PermissionError: If the dataset source is not readable due to permission issues.
See Also

save: Validate and save a dataset to the catalog.
in_directory: Factory method for creating catalog instances.
DataCatalog.load: Underlying catalog load operation.
DataValidator.validate: Underlying validation operation.

Notes

The load operation is idempotent: calling it multiple times with the same name and arguments should return equivalent data (assuming the underlying source hasn't changed).

Validation happens after loading, so the full dataset must be loaded into memory before validation can begin. For very large datasets, this may have performance implications. Some validator implementations support sampling-based validation to mitigate this.

If validation fails, the loaded data is discarded and not returned. This prevents invalid data from propagating through your pipeline.

The method does not cache loaded data. Each call performs a fresh load and validation. If caching is needed, implement it at a higher level or use a catalog implementation that supports caching.

Thread safety depends on the underlying catalog and validator implementations. Consult their documentation if concurrent loading is required.

Examples

Basic usage:

>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> df = catalog.load("customer_data")
>>> # df is guaranteed to meet all validation rules for "customer_data"
>>> print(df.columns)
Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')

Loading with additional arguments:

>>> # Load a specific version
>>> df_v1 = catalog.load("customer_data", version="2024-01-01")
>>>
>>> # Override load arguments
>>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]})

Handling validation failures:

>>> try:
...     df = catalog.load("strict_dataset")
... except Exception as e:
...     print(f"Validation failed: {e}")
...     # Log the failure, send alert, or handle gracefully
...     # The invalid data is not returned

Loading multiple datasets in a pipeline:

>>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
>>>
>>> # All datasets are validated on load
>>> customers = catalog.load("customers")
>>> orders = catalog.load("orders")
>>> products = catalog.load("products")
>>>
>>> # Merge validated datasets with confidence
>>> enriched = orders.merge(customers, on="customer_id")
>>> enriched = enriched.merge(products, on="product_id")

Using in a data processing function:

>>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame:
...     # Load and validate raw sales data
...     sales = catalog.load("raw_sales")
...
...     # Process with confidence that data meets expectations
...     sales["revenue"] = sales["quantity"] * sales["price"]
...     sales = sales.groupby("region").agg({"revenue": "sum"})
...
...     return sales

Comparing data across environments:

>>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
>>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
>>>
>>> # Same dataset name, different sources, same validation
>>> dev_data = dev_catalog.load("training_data")
>>> prod_data = prod_catalog.load("training_data")
>>>
>>> # Both are guaranteed to have the same schema and quality
def save(self, name: str, data: adc_toolkit.data.abs.Data) -> None:
543    def save(self, name: str, data: Data) -> None:
544        """
545        Validate a dataset and save it to the catalog.
546
547        This method performs a two-step operation:
548        1. Validate the dataset using the validator's validate() method
549        2. Save the validated dataset using the catalog's save() method
550
551        The validation step ensures that only data meeting all configured
552        quality expectations is persisted. If validation fails, an exception
553        is raised and no data is saved.
554
555        This provides a safety guarantee: any data saved via this method has
556        been validated and meets the configured expectations. Invalid data
557        is prevented from entering downstream systems or storage.
558
559        Parameters
560        ----------
561        name : str
562            The registered name of the dataset to save. This name must be
563            defined in both the catalog configuration (for saving location
564            and format) and the validator configuration (for validation rules).
565            The name serves as the lookup key for both components.
566        data : Data
567            The dataset to validate and save. Must be a Data protocol-compatible
568            object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has
569            'columns' and 'dtypes' properties. The data must satisfy all
570            validation rules configured for this dataset name.
571
572        Returns
573        -------
574        None
575            This method does not return a value. It performs a side effect
576            (saving data) after successful validation.
577
578        Raises
579        ------
580        KeyError
581            If the dataset name is not registered in the catalog or validator
582            configuration.
583        ValidationError
584            If the data fails validation. The exception includes details about
585            which validation rules failed, expected values, and observed values.
586            No data is saved when validation fails. The specific exception type
587            depends on the validator implementation (e.g.,
588            great_expectations.exceptions.ValidationError for GXValidator,
589            pandera.errors.SchemaError for PanderaValidator).
590        TypeError
591            If the data type is incompatible with the catalog's save operation
592            or the validator's expectations.
593        ValueError
594            If the data cannot be saved due to format errors or serialization
595            failures.
596        PermissionError
597            If the target save location is not writable.
598        OSError
599            If there are filesystem errors during the save operation (e.g.,
600            disk full, path too long).
601
602        See Also
603        --------
604        load : Load and validate a dataset from the catalog.
605        in_directory : Factory method for creating catalog instances.
606        DataCatalog.save : Underlying catalog save operation.
607        DataValidator.validate : Underlying validation operation.
608
609        Notes
610        -----
611        Validation happens before saving, so invalid data is never persisted.
612        This is crucial for maintaining data quality in downstream systems.
613
614        The save operation should be atomic when possible: either the entire
615        dataset is saved successfully, or no partial data is written. Atomicity
616        depends on the catalog implementation and underlying storage system.
617
618        Some catalog implementations support versioning, automatically creating
619        timestamped or numbered versions of saved datasets. Consult your catalog
620        documentation for details.
621
622        If the target file or location already exists, the behavior depends on
623        the catalog configuration. Common options include:
624        - Overwrite: Replace existing data (default for most catalogs)
625        - Append: Add to existing data
626        - Error: Raise an exception if target exists
627        - Version: Create a new version without overwriting
628
629        For very large datasets, validation may have performance implications
630        as the entire dataset must be validated before saving begins. Some
631        validator implementations support sampling-based validation.
632
633        Thread safety depends on the underlying catalog and validator
634        implementations. Consult their documentation if concurrent saving
635        is required.
636
637        The method does not modify the input data object. Validation may
638        internally create temporary copies or views, but the original data
639        parameter is unchanged.
640
641        Examples
642        --------
643        Basic usage:
644
645        >>> import pandas as pd
646        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
647        >>>
648        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
649        >>> df = pd.DataFrame(
650        ...     {
651        ...         "customer_id": [1, 2, 3],
652        ...         "name": ["Alice", "Bob", "Carol"],
653        ...         "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"],
654        ...     }
655        ... )
656        >>> catalog.save("customer_data", df)
657        >>> # Data is validated before saving; only valid data is persisted
658
659        Handling validation failures:
660
661        >>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]})
662        >>> try:
663        ...     catalog.save("customer_data", invalid_df)
664        ... except Exception as e:
665        ...     print(f"Validation failed: {e}")
666        ...     # Invalid data is not saved; downstream systems protected
667
668        Saving multiple datasets in a pipeline:
669
670        >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
671        >>>
672        >>> # Process and save intermediate results
673        >>> raw = catalog.load("raw_transactions")
674        >>> cleaned = raw.dropna()
675        >>> catalog.save("cleaned_transactions", cleaned)
676        >>>
677        >>> # Further processing
678        >>> features = engineer_features(cleaned)
679        >>> catalog.save("feature_matrix", features)
680        >>>
681        >>> # Final output
682        >>> predictions = model.predict(features)
683        >>> catalog.save("predictions", predictions)
684
685        Complete validation workflow:
686
687        >>> from adc_toolkit.data.catalog import ValidatedDataCatalog
688        >>>
689        >>> catalog = ValidatedDataCatalog.in_directory("config/production")
690        >>>
691        >>> # Load validated input
692        >>> input_data = catalog.load("input_dataset")
693        >>>
694        >>> # Transform data
695        >>> output_data = transform(input_data)
696        >>>
697        >>> # Save with validation
698        >>> try:
699        ...     catalog.save("output_dataset", output_data)
700        ... except Exception as e:
701        ...     # Validation failed; investigate and fix transformation
702        ...     logger.error(f"Output validation failed: {e}")
703        ...     # Original input_data is still available for debugging
704        ...     raise
705
706        Using save in a reusable processing function:
707
708        >>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None:
709        ...     # Load validated data
710        ...     sales = catalog.load(input_name)
711        ...
712        ...     # Aggregate
713        ...     aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"})
714        ...
715        ...     # Save validated output
716        ...     catalog.save(output_name, aggregated)
717        >>>
718        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
719        >>> aggregate_sales(catalog, "daily_sales", "monthly_sales")
720
721        Preventing invalid data from reaching production:
722
723        >>> # Development environment - experimenting with new features
724        >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
725        >>> experimental_df = create_new_features(raw_data)
726        >>>
727        >>> try:
728        ...     dev_catalog.save("feature_matrix", experimental_df)
729        ... except Exception as e:
730        ...     print(f"New features don't meet schema: {e}")
731        ...     # Fix the feature engineering before deploying to production
732        >>>
733        >>> # Production environment - same validation rules enforced
734        >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
735        >>> prod_catalog.save("feature_matrix", experimental_df)
736        >>> # Only succeeds if data meets production quality standards
737
738        Saving with different formats via catalog configuration:
739
740        >>> # Catalog configuration determines format (CSV, Parquet, etc.)
741        >>> catalog = ValidatedDataCatalog.in_directory("config/data")
742        >>>
743        >>> # Same save() call, different formats based on catalog config
744        >>> catalog.save("csv_output", df)  # Saved as CSV
745        >>> catalog.save("parquet_output", df)  # Saved as Parquet
746        >>> catalog.save("database_table", df)  # Saved to database
747        >>> # All validated before saving regardless of format
748        """
749        self.catalog.save(name, self.validator.validate(name, data))

Validate a dataset and save it to the catalog.

This method performs a two-step operation:

  1. Validate the dataset using the validator's validate() method
  2. Save the validated dataset using the catalog's save() method

The validation step ensures that only data meeting all configured quality expectations is persisted. If validation fails, an exception is raised and no data is saved.

This provides a safety guarantee: any data saved via this method has been validated and meets the configured expectations. Invalid data is prevented from entering downstream systems or storage.

Parameters
  • name (str): The registered name of the dataset to save. This name must be defined in both the catalog configuration (for saving location and format) and the validator configuration (for validation rules). The name serves as the lookup key for both components.
  • data (Data): The dataset to validate and save. Must be a Data protocol-compatible object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has 'columns' and 'dtypes' properties. The data must satisfy all validation rules configured for this dataset name.
Returns
  • None: This method does not return a value. It performs a side effect (saving data) after successful validation.
Raises
  • KeyError: If the dataset name is not registered in the catalog or validator configuration.
  • ValidationError: If the data fails validation. The exception includes details about which validation rules failed, expected values, and observed values. No data is saved when validation fails. The specific exception type depends on the validator implementation (e.g., great_expectations.exceptions.ValidationError for GXValidator, pandera.errors.SchemaError for PanderaValidator).
  • TypeError: If the data type is incompatible with the catalog's save operation or the validator's expectations.
  • ValueError: If the data cannot be saved due to format errors or serialization failures.
  • PermissionError: If the target save location is not writable.
  • OSError: If there are filesystem errors during the save operation (e.g., disk full, path too long).
See Also

load: Load and validate a dataset from the catalog.
in_directory: Factory method for creating catalog instances.
DataCatalog.save: Underlying catalog save operation.
DataValidator.validate: Underlying validation operation.

Notes

Validation happens before saving, so invalid data is never persisted. This is crucial for maintaining data quality in downstream systems.

The save operation should be atomic when possible: either the entire dataset is saved successfully, or no partial data is written. Atomicity depends on the catalog implementation and underlying storage system.

Some catalog implementations support versioning, automatically creating timestamped or numbered versions of saved datasets. Consult your catalog documentation for details.

If the target file or location already exists, the behavior depends on the catalog configuration. Common options include:

  • Overwrite: Replace existing data (default for most catalogs)
  • Append: Add to existing data
  • Error: Raise an exception if target exists
  • Version: Create a new version without overwriting

For very large datasets, validation may have performance implications as the entire dataset must be validated before saving begins. Some validator implementations support sampling-based validation.

Thread safety depends on the underlying catalog and validator implementations. Consult their documentation if concurrent saving is required.

The method does not modify the input data object. Validation may internally create temporary copies or views, but the original data parameter is unchanged.

Examples

Basic usage:

>>> import pandas as pd
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> df = pd.DataFrame(
...     {
...         "customer_id": [1, 2, 3],
...         "name": ["Alice", "Bob", "Carol"],
...         "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"],
...     }
... )
>>> catalog.save("customer_data", df)
>>> # Data is validated before saving; only valid data is persisted

Handling validation failures:

>>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]})
>>> try:
...     catalog.save("customer_data", invalid_df)
... except Exception as e:
...     print(f"Validation failed: {e}")
...     # Invalid data is not saved; downstream systems protected

Saving multiple datasets in a pipeline:

>>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
>>>
>>> # Process and save intermediate results
>>> raw = catalog.load("raw_transactions")
>>> cleaned = raw.dropna()
>>> catalog.save("cleaned_transactions", cleaned)
>>>
>>> # Further processing
>>> features = engineer_features(cleaned)
>>> catalog.save("feature_matrix", features)
>>>
>>> # Final output
>>> predictions = model.predict(features)
>>> catalog.save("predictions", predictions)

Complete validation workflow:

>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> catalog = ValidatedDataCatalog.in_directory("config/production")
>>>
>>> # Load validated input
>>> input_data = catalog.load("input_dataset")
>>>
>>> # Transform data
>>> output_data = transform(input_data)
>>>
>>> # Save with validation
>>> try:
...     catalog.save("output_dataset", output_data)
... except Exception as e:
...     # Validation failed; investigate and fix transformation
...     logger.error(f"Output validation failed: {e}")
...     # Original input_data is still available for debugging
...     raise

Using save in a reusable processing function:

>>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None:
...     # Load validated data
...     sales = catalog.load(input_name)
...
...     # Aggregate
...     aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"})
...
...     # Save validated output
...     catalog.save(output_name, aggregated)
>>>
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> aggregate_sales(catalog, "daily_sales", "monthly_sales")

Preventing invalid data from reaching production:

>>> # Development environment - experimenting with new features
>>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
>>> experimental_df = create_new_features(raw_data)
>>>
>>> try:
...     dev_catalog.save("feature_matrix", experimental_df)
... except Exception as e:
...     print(f"New features don't meet schema: {e}")
...     # Fix the feature engineering before deploying to production
>>>
>>> # Production environment - same validation rules enforced
>>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
>>> prod_catalog.save("feature_matrix", experimental_df)
>>> # Only succeeds if data meets production quality standards

Saving with different formats via catalog configuration:

>>> # Catalog configuration determines format (CSV, Parquet, etc.)
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>>
>>> # Same save() call, different formats based on catalog config
>>> catalog.save("csv_output", df)  # Saved as CSV
>>> catalog.save("parquet_output", df)  # Saved as Parquet
>>> catalog.save("database_table", df)  # Saved to database
>>> # All validated before saving regardless of format