adc_toolkit.data.catalog
Default adc_toolkit data catalog.
This module provides the ValidatedDataCatalog class, which is the primary user-facing API for the adc-toolkit data handling system. It combines a data catalog (for I/O operations) with a data validator (for quality checks) to provide automatic validation on all data loading and saving operations.
1""" 2Default adc_toolkit data catalog. 3 4This module provides the ValidatedDataCatalog class, which is the primary 5user-facing API for the adc-toolkit data handling system. It combines a 6data catalog (for I/O operations) with a data validator (for quality checks) 7to provide automatic validation on all data loading and saving operations. 8""" 9 10from pathlib import Path 11from typing import Any 12 13from adc_toolkit.data.abs import Data, DataCatalog, DataValidator 14from adc_toolkit.data.default_attributes import default_catalog, default_validator 15 16 17class ValidatedDataCatalog: 18 """ 19 Data catalog with automatic validation on load and save operations. 20 21 ValidatedDataCatalog is the main user-facing API for the adc-toolkit data 22 handling system. It wraps a DataCatalog (responsible for I/O operations) 23 and a DataValidator (responsible for data quality checks) to provide a 24 unified interface that automatically validates data after loading and 25 before saving. 26 27 This design ensures data quality is enforced at catalog boundaries, 28 catching issues early in data pipelines. Validation is transparent to 29 the user: simply call load() and save() as you would with a regular 30 catalog, and validation happens automatically. 31 32 The class uses dependency injection to allow flexible catalog and 33 validator implementations. By default, it uses KedroDataCatalog for 34 I/O and GXValidator (Great Expectations) for validation, but these 35 can be swapped for custom implementations or alternative validators 36 like PanderaValidator. 37 38 Parameters 39 ---------- 40 catalog : DataCatalog 41 The data catalog instance responsible for loading and saving datasets. 42 Must implement the DataCatalog protocol with load() and save() methods. 43 The default implementation is KedroDataCatalog, which uses Kedro's 44 configuration-driven catalog system with YAML-based dataset definitions. 45 validator : DataValidator 46 The data validator instance responsible for validating datasets. 47 Must implement the DataValidator protocol with a validate() method. 48 The default implementation is GXValidator (Great Expectations), with 49 PanderaValidator as a fallback. Use NoValidator to disable validation. 50 51 Attributes 52 ---------- 53 catalog : DataCatalog 54 The underlying data catalog for I/O operations. Immutable after 55 instantiation. 56 validator : DataValidator 57 The underlying data validator for quality checks. Immutable after 58 instantiation. 59 60 See Also 61 -------- 62 DataCatalog : Protocol defining the catalog interface. 63 DataValidator : Protocol defining the validator interface. 64 adc_toolkit.data.catalogs.kedro.KedroDataCatalog : Default catalog implementation. 65 adc_toolkit.data.validators.gx.GXValidator : Default validator (Great Expectations). 66 adc_toolkit.data.validators.pandera.PanderaValidator : Alternative validator. 67 adc_toolkit.data.validators.no_validator.NoValidator : No-op validator. 68 69 Notes 70 ----- 71 The class uses `__slots__` to restrict attributes to only 'catalog' and 72 'validator', preventing accidental attribute additions and reducing memory 73 overhead. 74 75 Attributes are immutable after instantiation. To use a different catalog 76 or validator, create a new ValidatedDataCatalog instance. 77 78 Validation happens in the following order: 79 - On load: catalog.load() -> validator.validate() -> return validated data 80 - On save: validator.validate() -> catalog.save() -> return None 81 82 This means invalid data will never be saved, and loaded data is always 83 validated before being returned to the caller. 84 85 The factory method in_directory() is the recommended way to instantiate 86 this class for most use cases. Direct instantiation via __init__() is 87 primarily useful for testing or custom configurations. 88 89 Examples 90 -------- 91 Basic usage with default catalog and validator: 92 93 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 94 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 95 >>> df = catalog.load("customer_data") 96 >>> # Data is automatically validated after loading 97 >>> processed_df = process_data(df) 98 >>> catalog.save("processed_customer_data", processed_df) 99 >>> # Data is automatically validated before saving 100 101 Using a custom validator while keeping the default catalog: 102 103 >>> from adc_toolkit.data.validators.pandera import PanderaValidator 104 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator) 105 >>> df = catalog.load("sales_data") 106 107 Using custom catalog and validator implementations: 108 109 >>> from myproject.catalogs import CustomCatalog 110 >>> from myproject.validators import CustomValidator 111 >>> catalog = ValidatedDataCatalog.in_directory( 112 ... "config/data", catalog_class=CustomCatalog, validator_class=CustomValidator 113 ... ) 114 115 Direct instantiation for testing or advanced use cases: 116 117 >>> from unittest.mock import Mock 118 >>> mock_catalog = Mock(spec=DataCatalog) 119 >>> mock_validator = Mock(spec=DataValidator) 120 >>> catalog = ValidatedDataCatalog(catalog=mock_catalog, validator=mock_validator) 121 122 Complete data pipeline example: 123 124 >>> import pandas as pd 125 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 126 >>> 127 >>> # Initialize catalog with validation 128 >>> catalog = ValidatedDataCatalog.in_directory("config/production") 129 >>> 130 >>> # Load and automatically validate raw data 131 >>> raw_data = catalog.load("raw_sales") 132 >>> 133 >>> # Process data 134 >>> cleaned = raw_data.dropna() 135 >>> aggregated = cleaned.groupby("region").sum() 136 >>> 137 >>> # Save and automatically validate before writing 138 >>> catalog.save("aggregated_sales", aggregated) 139 >>> 140 >>> # Validation errors are raised if data doesn't meet expectations 141 >>> try: 142 ... invalid_df = pd.DataFrame({"bad_column": [1, 2, 3]}) 143 ... catalog.save("aggregated_sales", invalid_df) 144 ... except Exception as e: 145 ... print(f"Validation failed: {e}") 146 147 Disabling validation for trusted data sources: 148 149 >>> from adc_toolkit.data.validators.no_validator import NoValidator 150 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator) 151 >>> # No validation overhead, useful for performance-critical paths 152 >>> df = catalog.load("trusted_source") 153 """ 154 155 __slots__ = ("catalog", "validator") 156 157 def __init__(self, catalog: DataCatalog, validator: DataValidator) -> None: 158 """ 159 Initialize a ValidatedDataCatalog with a catalog and validator. 160 161 This constructor is primarily used for testing or advanced use cases 162 where you have already instantiated catalog and validator objects. 163 For most use cases, prefer the in_directory() factory method. 164 165 Parameters 166 ---------- 167 catalog : DataCatalog 168 The data catalog instance for I/O operations. Must implement the 169 DataCatalog protocol with load(name) and save(name, data) methods. 170 validator : DataValidator 171 The data validator instance for quality checks. Must implement the 172 DataValidator protocol with a validate(name, data) method. 173 174 See Also 175 -------- 176 in_directory : Factory method for creating instances from configuration. 177 load : Load and validate a dataset. 178 save : Validate and save a dataset. 179 180 Notes 181 ----- 182 Both catalog and validator are stored as instance attributes and 183 become immutable after initialization due to `__slots__`. 184 185 The constructor performs no validation of the provided objects beyond 186 type annotations. It's the caller's responsibility to ensure the 187 objects implement the required protocols correctly. 188 189 Examples 190 -------- 191 Direct instantiation with concrete implementations: 192 193 >>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog 194 >>> from adc_toolkit.data.validators.gx import GXValidator 195 >>> 196 >>> catalog_impl = KedroDataCatalog("config/data") 197 >>> validator_impl = GXValidator.in_directory("config/validations") 198 >>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl) 199 200 Using mock objects for testing: 201 202 >>> from unittest.mock import Mock 203 >>> import pandas as pd 204 >>> 205 >>> mock_catalog = Mock(spec=DataCatalog) 206 >>> mock_validator = Mock(spec=DataValidator) 207 >>> 208 >>> # Setup mock behavior 209 >>> test_df = pd.DataFrame({"a": [1, 2, 3]}) 210 >>> mock_catalog.load.return_value = test_df 211 >>> mock_validator.validate.return_value = test_df 212 >>> 213 >>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator) 214 >>> result = catalog.load("test_dataset") 215 >>> mock_catalog.load.assert_called_once_with("test_dataset") 216 """ 217 self.catalog = catalog 218 self.validator = validator 219 220 @classmethod 221 def in_directory( 222 cls, 223 path: str | Path, 224 catalog_class: type[DataCatalog] | None = None, 225 validator_class: type[DataValidator] | None = None, 226 ) -> "ValidatedDataCatalog": 227 """ 228 Create a validated catalog from configuration in a directory. 229 230 This is the recommended factory method for creating ValidatedDataCatalog 231 instances in production code. It reads configuration files from the 232 specified directory and instantiates both the catalog and validator 233 using their respective in_directory() factory methods. 234 235 By default, this method uses KedroDataCatalog for I/O operations and 236 GXValidator (Great Expectations) for validation. If Great Expectations 237 is not installed, it falls back to PanderaValidator. Custom 238 implementations can be provided via the catalog_class and 239 validator_class parameters. 240 241 Parameters 242 ---------- 243 path : str or pathlib.Path 244 Path to the directory containing catalog and validator configuration 245 files. This directory typically contains: 246 - catalog.yml: Kedro catalog configuration (for KedroDataCatalog) 247 - expectations/: Great Expectations suite definitions (for GXValidator) 248 - Or equivalent configuration for custom implementations. 249 The path can be absolute or relative to the current working directory. 250 catalog_class : type[DataCatalog] or None, optional 251 Custom data catalog class to use instead of the default. Must be a 252 class (not instance) that implements the DataCatalog protocol and 253 provides an in_directory(path) class method. If None (default), 254 uses KedroDataCatalog. 255 validator_class : type[DataValidator] or None, optional 256 Custom data validator class to use instead of the default. Must be 257 a class (not instance) that implements the DataValidator protocol 258 and provides an in_directory(path) class method. If None (default), 259 uses GXValidator if available, otherwise PanderaValidator. 260 261 Returns 262 ------- 263 ValidatedDataCatalog 264 A new ValidatedDataCatalog instance with catalog and validator 265 initialized from the configuration directory. 266 267 Raises 268 ------ 269 FileNotFoundError 270 If the specified directory does not exist or required configuration 271 files are missing. 272 ImportError 273 If default catalog/validator classes are requested but their 274 required packages are not installed (e.g., kedro, great_expectations, 275 or pandera). 276 ValueError 277 If configuration files are malformed or contain invalid settings. 278 279 See Also 280 -------- 281 __init__ : Direct constructor for advanced use cases. 282 load : Load and validate a dataset from the catalog. 283 save : Validate and save a dataset to the catalog. 284 adc_toolkit.data.default_attributes.default_catalog : Function that creates default catalog. 285 adc_toolkit.data.default_attributes.default_validator : Function that creates default validator. 286 287 Notes 288 ----- 289 The method calls in_directory() on the catalog and validator classes, 290 which are responsible for reading their respective configuration files. 291 The exact configuration file format depends on the implementations used. 292 293 For KedroDataCatalog (default), the directory should contain: 294 - catalog.yml: Dataset definitions in Kedro format 295 - (optionally) credentials.yml: Credential configurations 296 297 For GXValidator (default), the directory should contain: 298 - great_expectations.yml: GX project configuration 299 - expectations/: Directory with expectation suite JSON files 300 - checkpoints/: Directory with checkpoint configurations 301 302 For PanderaValidator, the directory should contain: 303 - Python files defining Pandera schemas for each dataset 304 305 The directory structure can be organized as needed; implementations 306 are responsible for finding their configuration files within the path. 307 308 This method is thread-safe as long as the underlying catalog and 309 validator implementations are thread-safe. 310 311 Examples 312 -------- 313 Basic usage with default catalog and validator: 314 315 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 316 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 317 >>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator) 318 319 Using pathlib.Path: 320 321 >>> from pathlib import Path 322 >>> config_dir = Path("config") / "production" / "data" 323 >>> catalog = ValidatedDataCatalog.in_directory(config_dir) 324 325 Using a custom catalog with default validator: 326 327 >>> from myproject.catalogs import S3DataCatalog 328 >>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog) 329 >>> # Uses S3DataCatalog for I/O, GXValidator for validation 330 331 Using a custom validator with default catalog: 332 333 >>> from adc_toolkit.data.validators.pandera import PanderaValidator 334 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator) 335 >>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation 336 337 Using custom catalog and validator: 338 339 >>> from myproject.catalogs import DatabaseCatalog 340 >>> from myproject.validators import CustomValidator 341 >>> catalog = ValidatedDataCatalog.in_directory( 342 ... "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator 343 ... ) 344 345 Disabling validation for performance-critical scenarios: 346 347 >>> from adc_toolkit.data.validators.no_validator import NoValidator 348 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator) 349 >>> # No validation overhead, useful for trusted data sources 350 351 Different configurations for different environments: 352 353 >>> # Development: use local files with strict validation 354 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 355 >>> 356 >>> # Production: use cloud storage with the same validation 357 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 358 >>> 359 >>> # Testing: use in-memory catalog with no validation 360 >>> from unittest.mock import Mock 361 >>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator) 362 363 Complete workflow example: 364 365 >>> import pandas as pd 366 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 367 >>> 368 >>> # Initialize catalog from configuration 369 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 370 >>> 371 >>> # Load raw data (validated on load) 372 >>> raw = catalog.load("raw_transactions") 373 >>> 374 >>> # Process data 375 >>> cleaned = raw.dropna() 376 >>> features = engineer_features(cleaned) 377 >>> 378 >>> # Save results (validated before save) 379 >>> catalog.save("cleaned_transactions", cleaned) 380 >>> catalog.save("feature_matrix", features) 381 """ 382 catalog = catalog_class.in_directory(path) if catalog_class else default_catalog(path) 383 validator = validator_class.in_directory(path) if validator_class else default_validator(path) 384 return cls(catalog, validator) 385 386 def load(self, name: str, **kwargs: Any) -> Data: 387 """ 388 Load a dataset from the catalog and validate it. 389 390 This method performs a two-step operation: 391 1. Load the dataset using the underlying catalog's load() method 392 2. Validate the loaded dataset using the validator's validate() method 393 394 The validation step ensures that the loaded data meets all configured 395 quality expectations before being returned to the caller. If validation 396 fails, an exception is raised and no data is returned. 397 398 This provides a safety guarantee: any data returned from load() has 399 been validated and can be trusted to meet the configured expectations. 400 401 Parameters 402 ---------- 403 name : str 404 The registered name of the dataset to load. This name must be 405 defined in both the catalog configuration (for loading) and the 406 validator configuration (for validation rules). The name serves 407 as the lookup key for both components. 408 **kwargs : Any 409 Additional keyword arguments passed through to the underlying 410 catalog's load() method. The supported arguments depend on the 411 catalog implementation. Common examples include: 412 - version: str - Load a specific version of the dataset 413 - load_args: dict - Override default load arguments 414 - credentials: dict - Override default credentials 415 Consult your catalog implementation's documentation for details. 416 417 Returns 418 ------- 419 Data 420 The loaded and validated dataset. The specific type depends on 421 the catalog configuration (e.g., pandas.DataFrame, 422 pyspark.sql.DataFrame). The returned data has passed all 423 validation checks configured for this dataset name. 424 425 Raises 426 ------ 427 KeyError 428 If the dataset name is not registered in the catalog or validator 429 configuration. 430 FileNotFoundError 431 If the dataset's source file or location does not exist. 432 ValidationError 433 If the loaded data fails validation. The exception includes details 434 about which validation rules failed, expected values, and observed 435 values. The specific exception type depends on the validator 436 implementation (e.g., great_expectations.exceptions.ValidationError 437 for GXValidator, pandera.errors.SchemaError for PanderaValidator). 438 ValueError 439 If the dataset cannot be loaded due to format errors, parsing 440 failures, or incompatible data types. 441 TypeError 442 If the loaded data type is incompatible with the validator 443 expectations. 444 PermissionError 445 If the dataset source is not readable due to permission issues. 446 447 See Also 448 -------- 449 save : Validate and save a dataset to the catalog. 450 in_directory : Factory method for creating catalog instances. 451 DataCatalog.load : Underlying catalog load operation. 452 DataValidator.validate : Underlying validation operation. 453 454 Notes 455 ----- 456 The load operation is idempotent: calling it multiple times with the 457 same name and arguments should return equivalent data (assuming the 458 underlying source hasn't changed). 459 460 Validation happens after loading, so the full dataset must be loaded 461 into memory before validation can begin. For very large datasets, this 462 may have performance implications. Some validator implementations 463 support sampling-based validation to mitigate this. 464 465 If validation fails, the loaded data is discarded and not returned. 466 This prevents invalid data from propagating through your pipeline. 467 468 The method does not cache loaded data. Each call performs a fresh 469 load and validation. If caching is needed, implement it at a higher 470 level or use a catalog implementation that supports caching. 471 472 Thread safety depends on the underlying catalog and validator 473 implementations. Consult their documentation if concurrent loading 474 is required. 475 476 Examples 477 -------- 478 Basic usage: 479 480 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 481 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 482 >>> df = catalog.load("customer_data") 483 >>> # df is guaranteed to meet all validation rules for "customer_data" 484 >>> print(df.columns) 485 Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object') 486 487 Loading with additional arguments: 488 489 >>> # Load a specific version 490 >>> df_v1 = catalog.load("customer_data", version="2024-01-01") 491 >>> 492 >>> # Override load arguments 493 >>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]}) 494 495 Handling validation failures: 496 497 >>> try: 498 ... df = catalog.load("strict_dataset") 499 ... except Exception as e: 500 ... print(f"Validation failed: {e}") 501 ... # Log the failure, send alert, or handle gracefully 502 ... # The invalid data is not returned 503 504 Loading multiple datasets in a pipeline: 505 506 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 507 >>> 508 >>> # All datasets are validated on load 509 >>> customers = catalog.load("customers") 510 >>> orders = catalog.load("orders") 511 >>> products = catalog.load("products") 512 >>> 513 >>> # Merge validated datasets with confidence 514 >>> enriched = orders.merge(customers, on="customer_id") 515 >>> enriched = enriched.merge(products, on="product_id") 516 517 Using in a data processing function: 518 519 >>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame: 520 ... # Load and validate raw sales data 521 ... sales = catalog.load("raw_sales") 522 ... 523 ... # Process with confidence that data meets expectations 524 ... sales["revenue"] = sales["quantity"] * sales["price"] 525 ... sales = sales.groupby("region").agg({"revenue": "sum"}) 526 ... 527 ... return sales 528 529 Comparing data across environments: 530 531 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 532 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 533 >>> 534 >>> # Same dataset name, different sources, same validation 535 >>> dev_data = dev_catalog.load("training_data") 536 >>> prod_data = prod_catalog.load("training_data") 537 >>> 538 >>> # Both are guaranteed to have the same schema and quality 539 """ 540 return self.validator.validate(name, self.catalog.load(name, **kwargs)) 541 542 def save(self, name: str, data: Data) -> None: 543 """ 544 Validate a dataset and save it to the catalog. 545 546 This method performs a two-step operation: 547 1. Validate the dataset using the validator's validate() method 548 2. Save the validated dataset using the catalog's save() method 549 550 The validation step ensures that only data meeting all configured 551 quality expectations is persisted. If validation fails, an exception 552 is raised and no data is saved. 553 554 This provides a safety guarantee: any data saved via this method has 555 been validated and meets the configured expectations. Invalid data 556 is prevented from entering downstream systems or storage. 557 558 Parameters 559 ---------- 560 name : str 561 The registered name of the dataset to save. This name must be 562 defined in both the catalog configuration (for saving location 563 and format) and the validator configuration (for validation rules). 564 The name serves as the lookup key for both components. 565 data : Data 566 The dataset to validate and save. Must be a Data protocol-compatible 567 object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has 568 'columns' and 'dtypes' properties. The data must satisfy all 569 validation rules configured for this dataset name. 570 571 Returns 572 ------- 573 None 574 This method does not return a value. It performs a side effect 575 (saving data) after successful validation. 576 577 Raises 578 ------ 579 KeyError 580 If the dataset name is not registered in the catalog or validator 581 configuration. 582 ValidationError 583 If the data fails validation. The exception includes details about 584 which validation rules failed, expected values, and observed values. 585 No data is saved when validation fails. The specific exception type 586 depends on the validator implementation (e.g., 587 great_expectations.exceptions.ValidationError for GXValidator, 588 pandera.errors.SchemaError for PanderaValidator). 589 TypeError 590 If the data type is incompatible with the catalog's save operation 591 or the validator's expectations. 592 ValueError 593 If the data cannot be saved due to format errors or serialization 594 failures. 595 PermissionError 596 If the target save location is not writable. 597 OSError 598 If there are filesystem errors during the save operation (e.g., 599 disk full, path too long). 600 601 See Also 602 -------- 603 load : Load and validate a dataset from the catalog. 604 in_directory : Factory method for creating catalog instances. 605 DataCatalog.save : Underlying catalog save operation. 606 DataValidator.validate : Underlying validation operation. 607 608 Notes 609 ----- 610 Validation happens before saving, so invalid data is never persisted. 611 This is crucial for maintaining data quality in downstream systems. 612 613 The save operation should be atomic when possible: either the entire 614 dataset is saved successfully, or no partial data is written. Atomicity 615 depends on the catalog implementation and underlying storage system. 616 617 Some catalog implementations support versioning, automatically creating 618 timestamped or numbered versions of saved datasets. Consult your catalog 619 documentation for details. 620 621 If the target file or location already exists, the behavior depends on 622 the catalog configuration. Common options include: 623 - Overwrite: Replace existing data (default for most catalogs) 624 - Append: Add to existing data 625 - Error: Raise an exception if target exists 626 - Version: Create a new version without overwriting 627 628 For very large datasets, validation may have performance implications 629 as the entire dataset must be validated before saving begins. Some 630 validator implementations support sampling-based validation. 631 632 Thread safety depends on the underlying catalog and validator 633 implementations. Consult their documentation if concurrent saving 634 is required. 635 636 The method does not modify the input data object. Validation may 637 internally create temporary copies or views, but the original data 638 parameter is unchanged. 639 640 Examples 641 -------- 642 Basic usage: 643 644 >>> import pandas as pd 645 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 646 >>> 647 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 648 >>> df = pd.DataFrame( 649 ... { 650 ... "customer_id": [1, 2, 3], 651 ... "name": ["Alice", "Bob", "Carol"], 652 ... "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"], 653 ... } 654 ... ) 655 >>> catalog.save("customer_data", df) 656 >>> # Data is validated before saving; only valid data is persisted 657 658 Handling validation failures: 659 660 >>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]}) 661 >>> try: 662 ... catalog.save("customer_data", invalid_df) 663 ... except Exception as e: 664 ... print(f"Validation failed: {e}") 665 ... # Invalid data is not saved; downstream systems protected 666 667 Saving multiple datasets in a pipeline: 668 669 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 670 >>> 671 >>> # Process and save intermediate results 672 >>> raw = catalog.load("raw_transactions") 673 >>> cleaned = raw.dropna() 674 >>> catalog.save("cleaned_transactions", cleaned) 675 >>> 676 >>> # Further processing 677 >>> features = engineer_features(cleaned) 678 >>> catalog.save("feature_matrix", features) 679 >>> 680 >>> # Final output 681 >>> predictions = model.predict(features) 682 >>> catalog.save("predictions", predictions) 683 684 Complete validation workflow: 685 686 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 687 >>> 688 >>> catalog = ValidatedDataCatalog.in_directory("config/production") 689 >>> 690 >>> # Load validated input 691 >>> input_data = catalog.load("input_dataset") 692 >>> 693 >>> # Transform data 694 >>> output_data = transform(input_data) 695 >>> 696 >>> # Save with validation 697 >>> try: 698 ... catalog.save("output_dataset", output_data) 699 ... except Exception as e: 700 ... # Validation failed; investigate and fix transformation 701 ... logger.error(f"Output validation failed: {e}") 702 ... # Original input_data is still available for debugging 703 ... raise 704 705 Using save in a reusable processing function: 706 707 >>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None: 708 ... # Load validated data 709 ... sales = catalog.load(input_name) 710 ... 711 ... # Aggregate 712 ... aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"}) 713 ... 714 ... # Save validated output 715 ... catalog.save(output_name, aggregated) 716 >>> 717 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 718 >>> aggregate_sales(catalog, "daily_sales", "monthly_sales") 719 720 Preventing invalid data from reaching production: 721 722 >>> # Development environment - experimenting with new features 723 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 724 >>> experimental_df = create_new_features(raw_data) 725 >>> 726 >>> try: 727 ... dev_catalog.save("feature_matrix", experimental_df) 728 ... except Exception as e: 729 ... print(f"New features don't meet schema: {e}") 730 ... # Fix the feature engineering before deploying to production 731 >>> 732 >>> # Production environment - same validation rules enforced 733 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 734 >>> prod_catalog.save("feature_matrix", experimental_df) 735 >>> # Only succeeds if data meets production quality standards 736 737 Saving with different formats via catalog configuration: 738 739 >>> # Catalog configuration determines format (CSV, Parquet, etc.) 740 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 741 >>> 742 >>> # Same save() call, different formats based on catalog config 743 >>> catalog.save("csv_output", df) # Saved as CSV 744 >>> catalog.save("parquet_output", df) # Saved as Parquet 745 >>> catalog.save("database_table", df) # Saved to database 746 >>> # All validated before saving regardless of format 747 """ 748 self.catalog.save(name, self.validator.validate(name, data))
18class ValidatedDataCatalog: 19 """ 20 Data catalog with automatic validation on load and save operations. 21 22 ValidatedDataCatalog is the main user-facing API for the adc-toolkit data 23 handling system. It wraps a DataCatalog (responsible for I/O operations) 24 and a DataValidator (responsible for data quality checks) to provide a 25 unified interface that automatically validates data after loading and 26 before saving. 27 28 This design ensures data quality is enforced at catalog boundaries, 29 catching issues early in data pipelines. Validation is transparent to 30 the user: simply call load() and save() as you would with a regular 31 catalog, and validation happens automatically. 32 33 The class uses dependency injection to allow flexible catalog and 34 validator implementations. By default, it uses KedroDataCatalog for 35 I/O and GXValidator (Great Expectations) for validation, but these 36 can be swapped for custom implementations or alternative validators 37 like PanderaValidator. 38 39 Parameters 40 ---------- 41 catalog : DataCatalog 42 The data catalog instance responsible for loading and saving datasets. 43 Must implement the DataCatalog protocol with load() and save() methods. 44 The default implementation is KedroDataCatalog, which uses Kedro's 45 configuration-driven catalog system with YAML-based dataset definitions. 46 validator : DataValidator 47 The data validator instance responsible for validating datasets. 48 Must implement the DataValidator protocol with a validate() method. 49 The default implementation is GXValidator (Great Expectations), with 50 PanderaValidator as a fallback. Use NoValidator to disable validation. 51 52 Attributes 53 ---------- 54 catalog : DataCatalog 55 The underlying data catalog for I/O operations. Immutable after 56 instantiation. 57 validator : DataValidator 58 The underlying data validator for quality checks. Immutable after 59 instantiation. 60 61 See Also 62 -------- 63 DataCatalog : Protocol defining the catalog interface. 64 DataValidator : Protocol defining the validator interface. 65 adc_toolkit.data.catalogs.kedro.KedroDataCatalog : Default catalog implementation. 66 adc_toolkit.data.validators.gx.GXValidator : Default validator (Great Expectations). 67 adc_toolkit.data.validators.pandera.PanderaValidator : Alternative validator. 68 adc_toolkit.data.validators.no_validator.NoValidator : No-op validator. 69 70 Notes 71 ----- 72 The class uses `__slots__` to restrict attributes to only 'catalog' and 73 'validator', preventing accidental attribute additions and reducing memory 74 overhead. 75 76 Attributes are immutable after instantiation. To use a different catalog 77 or validator, create a new ValidatedDataCatalog instance. 78 79 Validation happens in the following order: 80 - On load: catalog.load() -> validator.validate() -> return validated data 81 - On save: validator.validate() -> catalog.save() -> return None 82 83 This means invalid data will never be saved, and loaded data is always 84 validated before being returned to the caller. 85 86 The factory method in_directory() is the recommended way to instantiate 87 this class for most use cases. Direct instantiation via __init__() is 88 primarily useful for testing or custom configurations. 89 90 Examples 91 -------- 92 Basic usage with default catalog and validator: 93 94 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 95 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 96 >>> df = catalog.load("customer_data") 97 >>> # Data is automatically validated after loading 98 >>> processed_df = process_data(df) 99 >>> catalog.save("processed_customer_data", processed_df) 100 >>> # Data is automatically validated before saving 101 102 Using a custom validator while keeping the default catalog: 103 104 >>> from adc_toolkit.data.validators.pandera import PanderaValidator 105 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator) 106 >>> df = catalog.load("sales_data") 107 108 Using custom catalog and validator implementations: 109 110 >>> from myproject.catalogs import CustomCatalog 111 >>> from myproject.validators import CustomValidator 112 >>> catalog = ValidatedDataCatalog.in_directory( 113 ... "config/data", catalog_class=CustomCatalog, validator_class=CustomValidator 114 ... ) 115 116 Direct instantiation for testing or advanced use cases: 117 118 >>> from unittest.mock import Mock 119 >>> mock_catalog = Mock(spec=DataCatalog) 120 >>> mock_validator = Mock(spec=DataValidator) 121 >>> catalog = ValidatedDataCatalog(catalog=mock_catalog, validator=mock_validator) 122 123 Complete data pipeline example: 124 125 >>> import pandas as pd 126 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 127 >>> 128 >>> # Initialize catalog with validation 129 >>> catalog = ValidatedDataCatalog.in_directory("config/production") 130 >>> 131 >>> # Load and automatically validate raw data 132 >>> raw_data = catalog.load("raw_sales") 133 >>> 134 >>> # Process data 135 >>> cleaned = raw_data.dropna() 136 >>> aggregated = cleaned.groupby("region").sum() 137 >>> 138 >>> # Save and automatically validate before writing 139 >>> catalog.save("aggregated_sales", aggregated) 140 >>> 141 >>> # Validation errors are raised if data doesn't meet expectations 142 >>> try: 143 ... invalid_df = pd.DataFrame({"bad_column": [1, 2, 3]}) 144 ... catalog.save("aggregated_sales", invalid_df) 145 ... except Exception as e: 146 ... print(f"Validation failed: {e}") 147 148 Disabling validation for trusted data sources: 149 150 >>> from adc_toolkit.data.validators.no_validator import NoValidator 151 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator) 152 >>> # No validation overhead, useful for performance-critical paths 153 >>> df = catalog.load("trusted_source") 154 """ 155 156 __slots__ = ("catalog", "validator") 157 158 def __init__(self, catalog: DataCatalog, validator: DataValidator) -> None: 159 """ 160 Initialize a ValidatedDataCatalog with a catalog and validator. 161 162 This constructor is primarily used for testing or advanced use cases 163 where you have already instantiated catalog and validator objects. 164 For most use cases, prefer the in_directory() factory method. 165 166 Parameters 167 ---------- 168 catalog : DataCatalog 169 The data catalog instance for I/O operations. Must implement the 170 DataCatalog protocol with load(name) and save(name, data) methods. 171 validator : DataValidator 172 The data validator instance for quality checks. Must implement the 173 DataValidator protocol with a validate(name, data) method. 174 175 See Also 176 -------- 177 in_directory : Factory method for creating instances from configuration. 178 load : Load and validate a dataset. 179 save : Validate and save a dataset. 180 181 Notes 182 ----- 183 Both catalog and validator are stored as instance attributes and 184 become immutable after initialization due to `__slots__`. 185 186 The constructor performs no validation of the provided objects beyond 187 type annotations. It's the caller's responsibility to ensure the 188 objects implement the required protocols correctly. 189 190 Examples 191 -------- 192 Direct instantiation with concrete implementations: 193 194 >>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog 195 >>> from adc_toolkit.data.validators.gx import GXValidator 196 >>> 197 >>> catalog_impl = KedroDataCatalog("config/data") 198 >>> validator_impl = GXValidator.in_directory("config/validations") 199 >>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl) 200 201 Using mock objects for testing: 202 203 >>> from unittest.mock import Mock 204 >>> import pandas as pd 205 >>> 206 >>> mock_catalog = Mock(spec=DataCatalog) 207 >>> mock_validator = Mock(spec=DataValidator) 208 >>> 209 >>> # Setup mock behavior 210 >>> test_df = pd.DataFrame({"a": [1, 2, 3]}) 211 >>> mock_catalog.load.return_value = test_df 212 >>> mock_validator.validate.return_value = test_df 213 >>> 214 >>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator) 215 >>> result = catalog.load("test_dataset") 216 >>> mock_catalog.load.assert_called_once_with("test_dataset") 217 """ 218 self.catalog = catalog 219 self.validator = validator 220 221 @classmethod 222 def in_directory( 223 cls, 224 path: str | Path, 225 catalog_class: type[DataCatalog] | None = None, 226 validator_class: type[DataValidator] | None = None, 227 ) -> "ValidatedDataCatalog": 228 """ 229 Create a validated catalog from configuration in a directory. 230 231 This is the recommended factory method for creating ValidatedDataCatalog 232 instances in production code. It reads configuration files from the 233 specified directory and instantiates both the catalog and validator 234 using their respective in_directory() factory methods. 235 236 By default, this method uses KedroDataCatalog for I/O operations and 237 GXValidator (Great Expectations) for validation. If Great Expectations 238 is not installed, it falls back to PanderaValidator. Custom 239 implementations can be provided via the catalog_class and 240 validator_class parameters. 241 242 Parameters 243 ---------- 244 path : str or pathlib.Path 245 Path to the directory containing catalog and validator configuration 246 files. This directory typically contains: 247 - catalog.yml: Kedro catalog configuration (for KedroDataCatalog) 248 - expectations/: Great Expectations suite definitions (for GXValidator) 249 - Or equivalent configuration for custom implementations. 250 The path can be absolute or relative to the current working directory. 251 catalog_class : type[DataCatalog] or None, optional 252 Custom data catalog class to use instead of the default. Must be a 253 class (not instance) that implements the DataCatalog protocol and 254 provides an in_directory(path) class method. If None (default), 255 uses KedroDataCatalog. 256 validator_class : type[DataValidator] or None, optional 257 Custom data validator class to use instead of the default. Must be 258 a class (not instance) that implements the DataValidator protocol 259 and provides an in_directory(path) class method. If None (default), 260 uses GXValidator if available, otherwise PanderaValidator. 261 262 Returns 263 ------- 264 ValidatedDataCatalog 265 A new ValidatedDataCatalog instance with catalog and validator 266 initialized from the configuration directory. 267 268 Raises 269 ------ 270 FileNotFoundError 271 If the specified directory does not exist or required configuration 272 files are missing. 273 ImportError 274 If default catalog/validator classes are requested but their 275 required packages are not installed (e.g., kedro, great_expectations, 276 or pandera). 277 ValueError 278 If configuration files are malformed or contain invalid settings. 279 280 See Also 281 -------- 282 __init__ : Direct constructor for advanced use cases. 283 load : Load and validate a dataset from the catalog. 284 save : Validate and save a dataset to the catalog. 285 adc_toolkit.data.default_attributes.default_catalog : Function that creates default catalog. 286 adc_toolkit.data.default_attributes.default_validator : Function that creates default validator. 287 288 Notes 289 ----- 290 The method calls in_directory() on the catalog and validator classes, 291 which are responsible for reading their respective configuration files. 292 The exact configuration file format depends on the implementations used. 293 294 For KedroDataCatalog (default), the directory should contain: 295 - catalog.yml: Dataset definitions in Kedro format 296 - (optionally) credentials.yml: Credential configurations 297 298 For GXValidator (default), the directory should contain: 299 - great_expectations.yml: GX project configuration 300 - expectations/: Directory with expectation suite JSON files 301 - checkpoints/: Directory with checkpoint configurations 302 303 For PanderaValidator, the directory should contain: 304 - Python files defining Pandera schemas for each dataset 305 306 The directory structure can be organized as needed; implementations 307 are responsible for finding their configuration files within the path. 308 309 This method is thread-safe as long as the underlying catalog and 310 validator implementations are thread-safe. 311 312 Examples 313 -------- 314 Basic usage with default catalog and validator: 315 316 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 317 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 318 >>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator) 319 320 Using pathlib.Path: 321 322 >>> from pathlib import Path 323 >>> config_dir = Path("config") / "production" / "data" 324 >>> catalog = ValidatedDataCatalog.in_directory(config_dir) 325 326 Using a custom catalog with default validator: 327 328 >>> from myproject.catalogs import S3DataCatalog 329 >>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog) 330 >>> # Uses S3DataCatalog for I/O, GXValidator for validation 331 332 Using a custom validator with default catalog: 333 334 >>> from adc_toolkit.data.validators.pandera import PanderaValidator 335 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator) 336 >>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation 337 338 Using custom catalog and validator: 339 340 >>> from myproject.catalogs import DatabaseCatalog 341 >>> from myproject.validators import CustomValidator 342 >>> catalog = ValidatedDataCatalog.in_directory( 343 ... "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator 344 ... ) 345 346 Disabling validation for performance-critical scenarios: 347 348 >>> from adc_toolkit.data.validators.no_validator import NoValidator 349 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator) 350 >>> # No validation overhead, useful for trusted data sources 351 352 Different configurations for different environments: 353 354 >>> # Development: use local files with strict validation 355 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 356 >>> 357 >>> # Production: use cloud storage with the same validation 358 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 359 >>> 360 >>> # Testing: use in-memory catalog with no validation 361 >>> from unittest.mock import Mock 362 >>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator) 363 364 Complete workflow example: 365 366 >>> import pandas as pd 367 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 368 >>> 369 >>> # Initialize catalog from configuration 370 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 371 >>> 372 >>> # Load raw data (validated on load) 373 >>> raw = catalog.load("raw_transactions") 374 >>> 375 >>> # Process data 376 >>> cleaned = raw.dropna() 377 >>> features = engineer_features(cleaned) 378 >>> 379 >>> # Save results (validated before save) 380 >>> catalog.save("cleaned_transactions", cleaned) 381 >>> catalog.save("feature_matrix", features) 382 """ 383 catalog = catalog_class.in_directory(path) if catalog_class else default_catalog(path) 384 validator = validator_class.in_directory(path) if validator_class else default_validator(path) 385 return cls(catalog, validator) 386 387 def load(self, name: str, **kwargs: Any) -> Data: 388 """ 389 Load a dataset from the catalog and validate it. 390 391 This method performs a two-step operation: 392 1. Load the dataset using the underlying catalog's load() method 393 2. Validate the loaded dataset using the validator's validate() method 394 395 The validation step ensures that the loaded data meets all configured 396 quality expectations before being returned to the caller. If validation 397 fails, an exception is raised and no data is returned. 398 399 This provides a safety guarantee: any data returned from load() has 400 been validated and can be trusted to meet the configured expectations. 401 402 Parameters 403 ---------- 404 name : str 405 The registered name of the dataset to load. This name must be 406 defined in both the catalog configuration (for loading) and the 407 validator configuration (for validation rules). The name serves 408 as the lookup key for both components. 409 **kwargs : Any 410 Additional keyword arguments passed through to the underlying 411 catalog's load() method. The supported arguments depend on the 412 catalog implementation. Common examples include: 413 - version: str - Load a specific version of the dataset 414 - load_args: dict - Override default load arguments 415 - credentials: dict - Override default credentials 416 Consult your catalog implementation's documentation for details. 417 418 Returns 419 ------- 420 Data 421 The loaded and validated dataset. The specific type depends on 422 the catalog configuration (e.g., pandas.DataFrame, 423 pyspark.sql.DataFrame). The returned data has passed all 424 validation checks configured for this dataset name. 425 426 Raises 427 ------ 428 KeyError 429 If the dataset name is not registered in the catalog or validator 430 configuration. 431 FileNotFoundError 432 If the dataset's source file or location does not exist. 433 ValidationError 434 If the loaded data fails validation. The exception includes details 435 about which validation rules failed, expected values, and observed 436 values. The specific exception type depends on the validator 437 implementation (e.g., great_expectations.exceptions.ValidationError 438 for GXValidator, pandera.errors.SchemaError for PanderaValidator). 439 ValueError 440 If the dataset cannot be loaded due to format errors, parsing 441 failures, or incompatible data types. 442 TypeError 443 If the loaded data type is incompatible with the validator 444 expectations. 445 PermissionError 446 If the dataset source is not readable due to permission issues. 447 448 See Also 449 -------- 450 save : Validate and save a dataset to the catalog. 451 in_directory : Factory method for creating catalog instances. 452 DataCatalog.load : Underlying catalog load operation. 453 DataValidator.validate : Underlying validation operation. 454 455 Notes 456 ----- 457 The load operation is idempotent: calling it multiple times with the 458 same name and arguments should return equivalent data (assuming the 459 underlying source hasn't changed). 460 461 Validation happens after loading, so the full dataset must be loaded 462 into memory before validation can begin. For very large datasets, this 463 may have performance implications. Some validator implementations 464 support sampling-based validation to mitigate this. 465 466 If validation fails, the loaded data is discarded and not returned. 467 This prevents invalid data from propagating through your pipeline. 468 469 The method does not cache loaded data. Each call performs a fresh 470 load and validation. If caching is needed, implement it at a higher 471 level or use a catalog implementation that supports caching. 472 473 Thread safety depends on the underlying catalog and validator 474 implementations. Consult their documentation if concurrent loading 475 is required. 476 477 Examples 478 -------- 479 Basic usage: 480 481 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 482 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 483 >>> df = catalog.load("customer_data") 484 >>> # df is guaranteed to meet all validation rules for "customer_data" 485 >>> print(df.columns) 486 Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object') 487 488 Loading with additional arguments: 489 490 >>> # Load a specific version 491 >>> df_v1 = catalog.load("customer_data", version="2024-01-01") 492 >>> 493 >>> # Override load arguments 494 >>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]}) 495 496 Handling validation failures: 497 498 >>> try: 499 ... df = catalog.load("strict_dataset") 500 ... except Exception as e: 501 ... print(f"Validation failed: {e}") 502 ... # Log the failure, send alert, or handle gracefully 503 ... # The invalid data is not returned 504 505 Loading multiple datasets in a pipeline: 506 507 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 508 >>> 509 >>> # All datasets are validated on load 510 >>> customers = catalog.load("customers") 511 >>> orders = catalog.load("orders") 512 >>> products = catalog.load("products") 513 >>> 514 >>> # Merge validated datasets with confidence 515 >>> enriched = orders.merge(customers, on="customer_id") 516 >>> enriched = enriched.merge(products, on="product_id") 517 518 Using in a data processing function: 519 520 >>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame: 521 ... # Load and validate raw sales data 522 ... sales = catalog.load("raw_sales") 523 ... 524 ... # Process with confidence that data meets expectations 525 ... sales["revenue"] = sales["quantity"] * sales["price"] 526 ... sales = sales.groupby("region").agg({"revenue": "sum"}) 527 ... 528 ... return sales 529 530 Comparing data across environments: 531 532 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 533 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 534 >>> 535 >>> # Same dataset name, different sources, same validation 536 >>> dev_data = dev_catalog.load("training_data") 537 >>> prod_data = prod_catalog.load("training_data") 538 >>> 539 >>> # Both are guaranteed to have the same schema and quality 540 """ 541 return self.validator.validate(name, self.catalog.load(name, **kwargs)) 542 543 def save(self, name: str, data: Data) -> None: 544 """ 545 Validate a dataset and save it to the catalog. 546 547 This method performs a two-step operation: 548 1. Validate the dataset using the validator's validate() method 549 2. Save the validated dataset using the catalog's save() method 550 551 The validation step ensures that only data meeting all configured 552 quality expectations is persisted. If validation fails, an exception 553 is raised and no data is saved. 554 555 This provides a safety guarantee: any data saved via this method has 556 been validated and meets the configured expectations. Invalid data 557 is prevented from entering downstream systems or storage. 558 559 Parameters 560 ---------- 561 name : str 562 The registered name of the dataset to save. This name must be 563 defined in both the catalog configuration (for saving location 564 and format) and the validator configuration (for validation rules). 565 The name serves as the lookup key for both components. 566 data : Data 567 The dataset to validate and save. Must be a Data protocol-compatible 568 object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has 569 'columns' and 'dtypes' properties. The data must satisfy all 570 validation rules configured for this dataset name. 571 572 Returns 573 ------- 574 None 575 This method does not return a value. It performs a side effect 576 (saving data) after successful validation. 577 578 Raises 579 ------ 580 KeyError 581 If the dataset name is not registered in the catalog or validator 582 configuration. 583 ValidationError 584 If the data fails validation. The exception includes details about 585 which validation rules failed, expected values, and observed values. 586 No data is saved when validation fails. The specific exception type 587 depends on the validator implementation (e.g., 588 great_expectations.exceptions.ValidationError for GXValidator, 589 pandera.errors.SchemaError for PanderaValidator). 590 TypeError 591 If the data type is incompatible with the catalog's save operation 592 or the validator's expectations. 593 ValueError 594 If the data cannot be saved due to format errors or serialization 595 failures. 596 PermissionError 597 If the target save location is not writable. 598 OSError 599 If there are filesystem errors during the save operation (e.g., 600 disk full, path too long). 601 602 See Also 603 -------- 604 load : Load and validate a dataset from the catalog. 605 in_directory : Factory method for creating catalog instances. 606 DataCatalog.save : Underlying catalog save operation. 607 DataValidator.validate : Underlying validation operation. 608 609 Notes 610 ----- 611 Validation happens before saving, so invalid data is never persisted. 612 This is crucial for maintaining data quality in downstream systems. 613 614 The save operation should be atomic when possible: either the entire 615 dataset is saved successfully, or no partial data is written. Atomicity 616 depends on the catalog implementation and underlying storage system. 617 618 Some catalog implementations support versioning, automatically creating 619 timestamped or numbered versions of saved datasets. Consult your catalog 620 documentation for details. 621 622 If the target file or location already exists, the behavior depends on 623 the catalog configuration. Common options include: 624 - Overwrite: Replace existing data (default for most catalogs) 625 - Append: Add to existing data 626 - Error: Raise an exception if target exists 627 - Version: Create a new version without overwriting 628 629 For very large datasets, validation may have performance implications 630 as the entire dataset must be validated before saving begins. Some 631 validator implementations support sampling-based validation. 632 633 Thread safety depends on the underlying catalog and validator 634 implementations. Consult their documentation if concurrent saving 635 is required. 636 637 The method does not modify the input data object. Validation may 638 internally create temporary copies or views, but the original data 639 parameter is unchanged. 640 641 Examples 642 -------- 643 Basic usage: 644 645 >>> import pandas as pd 646 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 647 >>> 648 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 649 >>> df = pd.DataFrame( 650 ... { 651 ... "customer_id": [1, 2, 3], 652 ... "name": ["Alice", "Bob", "Carol"], 653 ... "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"], 654 ... } 655 ... ) 656 >>> catalog.save("customer_data", df) 657 >>> # Data is validated before saving; only valid data is persisted 658 659 Handling validation failures: 660 661 >>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]}) 662 >>> try: 663 ... catalog.save("customer_data", invalid_df) 664 ... except Exception as e: 665 ... print(f"Validation failed: {e}") 666 ... # Invalid data is not saved; downstream systems protected 667 668 Saving multiple datasets in a pipeline: 669 670 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 671 >>> 672 >>> # Process and save intermediate results 673 >>> raw = catalog.load("raw_transactions") 674 >>> cleaned = raw.dropna() 675 >>> catalog.save("cleaned_transactions", cleaned) 676 >>> 677 >>> # Further processing 678 >>> features = engineer_features(cleaned) 679 >>> catalog.save("feature_matrix", features) 680 >>> 681 >>> # Final output 682 >>> predictions = model.predict(features) 683 >>> catalog.save("predictions", predictions) 684 685 Complete validation workflow: 686 687 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 688 >>> 689 >>> catalog = ValidatedDataCatalog.in_directory("config/production") 690 >>> 691 >>> # Load validated input 692 >>> input_data = catalog.load("input_dataset") 693 >>> 694 >>> # Transform data 695 >>> output_data = transform(input_data) 696 >>> 697 >>> # Save with validation 698 >>> try: 699 ... catalog.save("output_dataset", output_data) 700 ... except Exception as e: 701 ... # Validation failed; investigate and fix transformation 702 ... logger.error(f"Output validation failed: {e}") 703 ... # Original input_data is still available for debugging 704 ... raise 705 706 Using save in a reusable processing function: 707 708 >>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None: 709 ... # Load validated data 710 ... sales = catalog.load(input_name) 711 ... 712 ... # Aggregate 713 ... aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"}) 714 ... 715 ... # Save validated output 716 ... catalog.save(output_name, aggregated) 717 >>> 718 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 719 >>> aggregate_sales(catalog, "daily_sales", "monthly_sales") 720 721 Preventing invalid data from reaching production: 722 723 >>> # Development environment - experimenting with new features 724 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 725 >>> experimental_df = create_new_features(raw_data) 726 >>> 727 >>> try: 728 ... dev_catalog.save("feature_matrix", experimental_df) 729 ... except Exception as e: 730 ... print(f"New features don't meet schema: {e}") 731 ... # Fix the feature engineering before deploying to production 732 >>> 733 >>> # Production environment - same validation rules enforced 734 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 735 >>> prod_catalog.save("feature_matrix", experimental_df) 736 >>> # Only succeeds if data meets production quality standards 737 738 Saving with different formats via catalog configuration: 739 740 >>> # Catalog configuration determines format (CSV, Parquet, etc.) 741 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 742 >>> 743 >>> # Same save() call, different formats based on catalog config 744 >>> catalog.save("csv_output", df) # Saved as CSV 745 >>> catalog.save("parquet_output", df) # Saved as Parquet 746 >>> catalog.save("database_table", df) # Saved to database 747 >>> # All validated before saving regardless of format 748 """ 749 self.catalog.save(name, self.validator.validate(name, data))
Data catalog with automatic validation on load and save operations.
ValidatedDataCatalog is the main user-facing API for the adc-toolkit data handling system. It wraps a DataCatalog (responsible for I/O operations) and a DataValidator (responsible for data quality checks) to provide a unified interface that automatically validates data after loading and before saving.
This design ensures data quality is enforced at catalog boundaries, catching issues early in data pipelines. Validation is transparent to the user: simply call load() and save() as you would with a regular catalog, and validation happens automatically.
The class uses dependency injection to allow flexible catalog and validator implementations. By default, it uses KedroDataCatalog for I/O and GXValidator (Great Expectations) for validation, but these can be swapped for custom implementations or alternative validators like PanderaValidator.
Parameters
- catalog (DataCatalog): The data catalog instance responsible for loading and saving datasets. Must implement the DataCatalog protocol with load() and save() methods. The default implementation is KedroDataCatalog, which uses Kedro's configuration-driven catalog system with YAML-based dataset definitions.
- validator (DataValidator): The data validator instance responsible for validating datasets. Must implement the DataValidator protocol with a validate() method. The default implementation is GXValidator (Great Expectations), with PanderaValidator as a fallback. Use NoValidator to disable validation.
Attributes
- catalog (DataCatalog): The underlying data catalog for I/O operations. Immutable after instantiation.
- validator (DataValidator): The underlying data validator for quality checks. Immutable after instantiation.
See Also
DataCatalog: Protocol defining the catalog interface.
DataValidator: Protocol defining the validator interface.
adc_toolkit.data.catalogs.kedro.KedroDataCatalog: Default catalog implementation.
adc_toolkit.data.validators.gx.GXValidator: Default validator (Great Expectations).
adc_toolkit.data.validators.pandera.PanderaValidator: Alternative validator.
adc_toolkit.data.validators.no_validator.NoValidator: No-op validator.
Notes
The class uses __slots__ to restrict attributes to only 'catalog' and
'validator', preventing accidental attribute additions and reducing memory
overhead.
Attributes are immutable after instantiation. To use a different catalog or validator, create a new ValidatedDataCatalog instance.
Validation happens in the following order:
- On load: catalog.load() -> validator.validate() -> return validated data
- On save: validator.validate() -> catalog.save() -> return None
This means invalid data will never be saved, and loaded data is always validated before being returned to the caller.
The factory method in_directory() is the recommended way to instantiate this class for most use cases. Direct instantiation via __init__() is primarily useful for testing or custom configurations.
Examples
Basic usage with default catalog and validator:
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> df = catalog.load("customer_data")
>>> # Data is automatically validated after loading
>>> processed_df = process_data(df)
>>> catalog.save("processed_customer_data", processed_df)
>>> # Data is automatically validated before saving
Using a custom validator while keeping the default catalog:
>>> from adc_toolkit.data.validators.pandera import PanderaValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
>>> df = catalog.load("sales_data")
Using custom catalog and validator implementations:
>>> from myproject.catalogs import CustomCatalog
>>> from myproject.validators import CustomValidator
>>> catalog = ValidatedDataCatalog.in_directory(
... "config/data", catalog_class=CustomCatalog, validator_class=CustomValidator
... )
Direct instantiation for testing or advanced use cases:
>>> from unittest.mock import Mock
>>> mock_catalog = Mock(spec=DataCatalog)
>>> mock_validator = Mock(spec=DataValidator)
>>> catalog = ValidatedDataCatalog(catalog=mock_catalog, validator=mock_validator)
Complete data pipeline example:
>>> import pandas as pd
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> # Initialize catalog with validation
>>> catalog = ValidatedDataCatalog.in_directory("config/production")
>>>
>>> # Load and automatically validate raw data
>>> raw_data = catalog.load("raw_sales")
>>>
>>> # Process data
>>> cleaned = raw_data.dropna()
>>> aggregated = cleaned.groupby("region").sum()
>>>
>>> # Save and automatically validate before writing
>>> catalog.save("aggregated_sales", aggregated)
>>>
>>> # Validation errors are raised if data doesn't meet expectations
>>> try:
... invalid_df = pd.DataFrame({"bad_column": [1, 2, 3]})
... catalog.save("aggregated_sales", invalid_df)
... except Exception as e:
... print(f"Validation failed: {e}")
Disabling validation for trusted data sources:
>>> from adc_toolkit.data.validators.no_validator import NoValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
>>> # No validation overhead, useful for performance-critical paths
>>> df = catalog.load("trusted_source")
158 def __init__(self, catalog: DataCatalog, validator: DataValidator) -> None: 159 """ 160 Initialize a ValidatedDataCatalog with a catalog and validator. 161 162 This constructor is primarily used for testing or advanced use cases 163 where you have already instantiated catalog and validator objects. 164 For most use cases, prefer the in_directory() factory method. 165 166 Parameters 167 ---------- 168 catalog : DataCatalog 169 The data catalog instance for I/O operations. Must implement the 170 DataCatalog protocol with load(name) and save(name, data) methods. 171 validator : DataValidator 172 The data validator instance for quality checks. Must implement the 173 DataValidator protocol with a validate(name, data) method. 174 175 See Also 176 -------- 177 in_directory : Factory method for creating instances from configuration. 178 load : Load and validate a dataset. 179 save : Validate and save a dataset. 180 181 Notes 182 ----- 183 Both catalog and validator are stored as instance attributes and 184 become immutable after initialization due to `__slots__`. 185 186 The constructor performs no validation of the provided objects beyond 187 type annotations. It's the caller's responsibility to ensure the 188 objects implement the required protocols correctly. 189 190 Examples 191 -------- 192 Direct instantiation with concrete implementations: 193 194 >>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog 195 >>> from adc_toolkit.data.validators.gx import GXValidator 196 >>> 197 >>> catalog_impl = KedroDataCatalog("config/data") 198 >>> validator_impl = GXValidator.in_directory("config/validations") 199 >>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl) 200 201 Using mock objects for testing: 202 203 >>> from unittest.mock import Mock 204 >>> import pandas as pd 205 >>> 206 >>> mock_catalog = Mock(spec=DataCatalog) 207 >>> mock_validator = Mock(spec=DataValidator) 208 >>> 209 >>> # Setup mock behavior 210 >>> test_df = pd.DataFrame({"a": [1, 2, 3]}) 211 >>> mock_catalog.load.return_value = test_df 212 >>> mock_validator.validate.return_value = test_df 213 >>> 214 >>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator) 215 >>> result = catalog.load("test_dataset") 216 >>> mock_catalog.load.assert_called_once_with("test_dataset") 217 """ 218 self.catalog = catalog 219 self.validator = validator
Initialize a ValidatedDataCatalog with a catalog and validator.
This constructor is primarily used for testing or advanced use cases where you have already instantiated catalog and validator objects. For most use cases, prefer the in_directory() factory method.
Parameters
- catalog (DataCatalog): The data catalog instance for I/O operations. Must implement the DataCatalog protocol with load(name) and save(name, data) methods.
- validator (DataValidator): The data validator instance for quality checks. Must implement the DataValidator protocol with a validate(name, data) method.
See Also
in_directory: Factory method for creating instances from configuration.
load: Load and validate a dataset.
save: Validate and save a dataset.
Notes
Both catalog and validator are stored as instance attributes and
become immutable after initialization due to __slots__.
The constructor performs no validation of the provided objects beyond type annotations. It's the caller's responsibility to ensure the objects implement the required protocols correctly.
Examples
Direct instantiation with concrete implementations:
>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
>>> from adc_toolkit.data.validators.gx import GXValidator
>>>
>>> catalog_impl = KedroDataCatalog("config/data")
>>> validator_impl = GXValidator.in_directory("config/validations")
>>> validated_catalog = ValidatedDataCatalog(catalog=catalog_impl, validator=validator_impl)
Using mock objects for testing:
>>> from unittest.mock import Mock
>>> import pandas as pd
>>>
>>> mock_catalog = Mock(spec=DataCatalog)
>>> mock_validator = Mock(spec=DataValidator)
>>>
>>> # Setup mock behavior
>>> test_df = pd.DataFrame({"a": [1, 2, 3]})
>>> mock_catalog.load.return_value = test_df
>>> mock_validator.validate.return_value = test_df
>>>
>>> catalog = ValidatedDataCatalog(mock_catalog, mock_validator)
>>> result = catalog.load("test_dataset")
>>> mock_catalog.load.assert_called_once_with("test_dataset")
221 @classmethod 222 def in_directory( 223 cls, 224 path: str | Path, 225 catalog_class: type[DataCatalog] | None = None, 226 validator_class: type[DataValidator] | None = None, 227 ) -> "ValidatedDataCatalog": 228 """ 229 Create a validated catalog from configuration in a directory. 230 231 This is the recommended factory method for creating ValidatedDataCatalog 232 instances in production code. It reads configuration files from the 233 specified directory and instantiates both the catalog and validator 234 using their respective in_directory() factory methods. 235 236 By default, this method uses KedroDataCatalog for I/O operations and 237 GXValidator (Great Expectations) for validation. If Great Expectations 238 is not installed, it falls back to PanderaValidator. Custom 239 implementations can be provided via the catalog_class and 240 validator_class parameters. 241 242 Parameters 243 ---------- 244 path : str or pathlib.Path 245 Path to the directory containing catalog and validator configuration 246 files. This directory typically contains: 247 - catalog.yml: Kedro catalog configuration (for KedroDataCatalog) 248 - expectations/: Great Expectations suite definitions (for GXValidator) 249 - Or equivalent configuration for custom implementations. 250 The path can be absolute or relative to the current working directory. 251 catalog_class : type[DataCatalog] or None, optional 252 Custom data catalog class to use instead of the default. Must be a 253 class (not instance) that implements the DataCatalog protocol and 254 provides an in_directory(path) class method. If None (default), 255 uses KedroDataCatalog. 256 validator_class : type[DataValidator] or None, optional 257 Custom data validator class to use instead of the default. Must be 258 a class (not instance) that implements the DataValidator protocol 259 and provides an in_directory(path) class method. If None (default), 260 uses GXValidator if available, otherwise PanderaValidator. 261 262 Returns 263 ------- 264 ValidatedDataCatalog 265 A new ValidatedDataCatalog instance with catalog and validator 266 initialized from the configuration directory. 267 268 Raises 269 ------ 270 FileNotFoundError 271 If the specified directory does not exist or required configuration 272 files are missing. 273 ImportError 274 If default catalog/validator classes are requested but their 275 required packages are not installed (e.g., kedro, great_expectations, 276 or pandera). 277 ValueError 278 If configuration files are malformed or contain invalid settings. 279 280 See Also 281 -------- 282 __init__ : Direct constructor for advanced use cases. 283 load : Load and validate a dataset from the catalog. 284 save : Validate and save a dataset to the catalog. 285 adc_toolkit.data.default_attributes.default_catalog : Function that creates default catalog. 286 adc_toolkit.data.default_attributes.default_validator : Function that creates default validator. 287 288 Notes 289 ----- 290 The method calls in_directory() on the catalog and validator classes, 291 which are responsible for reading their respective configuration files. 292 The exact configuration file format depends on the implementations used. 293 294 For KedroDataCatalog (default), the directory should contain: 295 - catalog.yml: Dataset definitions in Kedro format 296 - (optionally) credentials.yml: Credential configurations 297 298 For GXValidator (default), the directory should contain: 299 - great_expectations.yml: GX project configuration 300 - expectations/: Directory with expectation suite JSON files 301 - checkpoints/: Directory with checkpoint configurations 302 303 For PanderaValidator, the directory should contain: 304 - Python files defining Pandera schemas for each dataset 305 306 The directory structure can be organized as needed; implementations 307 are responsible for finding their configuration files within the path. 308 309 This method is thread-safe as long as the underlying catalog and 310 validator implementations are thread-safe. 311 312 Examples 313 -------- 314 Basic usage with default catalog and validator: 315 316 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 317 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 318 >>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator) 319 320 Using pathlib.Path: 321 322 >>> from pathlib import Path 323 >>> config_dir = Path("config") / "production" / "data" 324 >>> catalog = ValidatedDataCatalog.in_directory(config_dir) 325 326 Using a custom catalog with default validator: 327 328 >>> from myproject.catalogs import S3DataCatalog 329 >>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog) 330 >>> # Uses S3DataCatalog for I/O, GXValidator for validation 331 332 Using a custom validator with default catalog: 333 334 >>> from adc_toolkit.data.validators.pandera import PanderaValidator 335 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator) 336 >>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation 337 338 Using custom catalog and validator: 339 340 >>> from myproject.catalogs import DatabaseCatalog 341 >>> from myproject.validators import CustomValidator 342 >>> catalog = ValidatedDataCatalog.in_directory( 343 ... "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator 344 ... ) 345 346 Disabling validation for performance-critical scenarios: 347 348 >>> from adc_toolkit.data.validators.no_validator import NoValidator 349 >>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator) 350 >>> # No validation overhead, useful for trusted data sources 351 352 Different configurations for different environments: 353 354 >>> # Development: use local files with strict validation 355 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 356 >>> 357 >>> # Production: use cloud storage with the same validation 358 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 359 >>> 360 >>> # Testing: use in-memory catalog with no validation 361 >>> from unittest.mock import Mock 362 >>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator) 363 364 Complete workflow example: 365 366 >>> import pandas as pd 367 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 368 >>> 369 >>> # Initialize catalog from configuration 370 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 371 >>> 372 >>> # Load raw data (validated on load) 373 >>> raw = catalog.load("raw_transactions") 374 >>> 375 >>> # Process data 376 >>> cleaned = raw.dropna() 377 >>> features = engineer_features(cleaned) 378 >>> 379 >>> # Save results (validated before save) 380 >>> catalog.save("cleaned_transactions", cleaned) 381 >>> catalog.save("feature_matrix", features) 382 """ 383 catalog = catalog_class.in_directory(path) if catalog_class else default_catalog(path) 384 validator = validator_class.in_directory(path) if validator_class else default_validator(path) 385 return cls(catalog, validator)
Create a validated catalog from configuration in a directory.
This is the recommended factory method for creating ValidatedDataCatalog instances in production code. It reads configuration files from the specified directory and instantiates both the catalog and validator using their respective in_directory() factory methods.
By default, this method uses KedroDataCatalog for I/O operations and GXValidator (Great Expectations) for validation. If Great Expectations is not installed, it falls back to PanderaValidator. Custom implementations can be provided via the catalog_class and validator_class parameters.
Parameters
- path (str or pathlib.Path):
Path to the directory containing catalog and validator configuration
files. This directory typically contains:
- catalog.yml: Kedro catalog configuration (for KedroDataCatalog)
- expectations/: Great Expectations suite definitions (for GXValidator)
- Or equivalent configuration for custom implementations. The path can be absolute or relative to the current working directory.
- catalog_class (type[DataCatalog] or None, optional): Custom data catalog class to use instead of the default. Must be a class (not instance) that implements the DataCatalog protocol and provides an in_directory(path) class method. If None (default), uses KedroDataCatalog.
- validator_class (type[DataValidator] or None, optional): Custom data validator class to use instead of the default. Must be a class (not instance) that implements the DataValidator protocol and provides an in_directory(path) class method. If None (default), uses GXValidator if available, otherwise PanderaValidator.
Returns
- ValidatedDataCatalog: A new ValidatedDataCatalog instance with catalog and validator initialized from the configuration directory.
Raises
- FileNotFoundError: If the specified directory does not exist or required configuration files are missing.
- ImportError: If default catalog/validator classes are requested but their required packages are not installed (e.g., kedro, great_expectations, or pandera).
- ValueError: If configuration files are malformed or contain invalid settings.
See Also
__init__: Direct constructor for advanced use cases.
load: Load and validate a dataset from the catalog.
save: Validate and save a dataset to the catalog.
adc_toolkit.data.default_attributes.default_catalog: Function that creates default catalog.
adc_toolkit.data.default_attributes.default_validator: Function that creates default validator.
Notes
The method calls in_directory() on the catalog and validator classes, which are responsible for reading their respective configuration files. The exact configuration file format depends on the implementations used.
For KedroDataCatalog (default), the directory should contain:
- catalog.yml: Dataset definitions in Kedro format
- (optionally) credentials.yml: Credential configurations
For GXValidator (default), the directory should contain:
- great_expectations.yml: GX project configuration
- expectations/: Directory with expectation suite JSON files
- checkpoints/: Directory with checkpoint configurations
For PanderaValidator, the directory should contain:
- Python files defining Pandera schemas for each dataset
The directory structure can be organized as needed; implementations are responsible for finding their configuration files within the path.
This method is thread-safe as long as the underlying catalog and validator implementations are thread-safe.
Examples
Basic usage with default catalog and validator:
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> # Uses KedroDataCatalog and GXValidator (or PanderaValidator)
Using pathlib.Path:
>>> from pathlib import Path
>>> config_dir = Path("config") / "production" / "data"
>>> catalog = ValidatedDataCatalog.in_directory(config_dir)
Using a custom catalog with default validator:
>>> from myproject.catalogs import S3DataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data", catalog_class=S3DataCatalog)
>>> # Uses S3DataCatalog for I/O, GXValidator for validation
Using a custom validator with default catalog:
>>> from adc_toolkit.data.validators.pandera import PanderaValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=PanderaValidator)
>>> # Uses KedroDataCatalog for I/O, PanderaValidator for validation
Using custom catalog and validator:
>>> from myproject.catalogs import DatabaseCatalog
>>> from myproject.validators import CustomValidator
>>> catalog = ValidatedDataCatalog.in_directory(
... "config/data", catalog_class=DatabaseCatalog, validator_class=CustomValidator
... )
Disabling validation for performance-critical scenarios:
>>> from adc_toolkit.data.validators.no_validator import NoValidator
>>> catalog = ValidatedDataCatalog.in_directory("config/data", validator_class=NoValidator)
>>> # No validation overhead, useful for trusted data sources
Different configurations for different environments:
>>> # Development: use local files with strict validation
>>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
>>>
>>> # Production: use cloud storage with the same validation
>>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
>>>
>>> # Testing: use in-memory catalog with no validation
>>> from unittest.mock import Mock
>>> test_catalog = ValidatedDataCatalog.in_directory("config/test", validator_class=NoValidator)
Complete workflow example:
>>> import pandas as pd
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> # Initialize catalog from configuration
>>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
>>>
>>> # Load raw data (validated on load)
>>> raw = catalog.load("raw_transactions")
>>>
>>> # Process data
>>> cleaned = raw.dropna()
>>> features = engineer_features(cleaned)
>>>
>>> # Save results (validated before save)
>>> catalog.save("cleaned_transactions", cleaned)
>>> catalog.save("feature_matrix", features)
387 def load(self, name: str, **kwargs: Any) -> Data: 388 """ 389 Load a dataset from the catalog and validate it. 390 391 This method performs a two-step operation: 392 1. Load the dataset using the underlying catalog's load() method 393 2. Validate the loaded dataset using the validator's validate() method 394 395 The validation step ensures that the loaded data meets all configured 396 quality expectations before being returned to the caller. If validation 397 fails, an exception is raised and no data is returned. 398 399 This provides a safety guarantee: any data returned from load() has 400 been validated and can be trusted to meet the configured expectations. 401 402 Parameters 403 ---------- 404 name : str 405 The registered name of the dataset to load. This name must be 406 defined in both the catalog configuration (for loading) and the 407 validator configuration (for validation rules). The name serves 408 as the lookup key for both components. 409 **kwargs : Any 410 Additional keyword arguments passed through to the underlying 411 catalog's load() method. The supported arguments depend on the 412 catalog implementation. Common examples include: 413 - version: str - Load a specific version of the dataset 414 - load_args: dict - Override default load arguments 415 - credentials: dict - Override default credentials 416 Consult your catalog implementation's documentation for details. 417 418 Returns 419 ------- 420 Data 421 The loaded and validated dataset. The specific type depends on 422 the catalog configuration (e.g., pandas.DataFrame, 423 pyspark.sql.DataFrame). The returned data has passed all 424 validation checks configured for this dataset name. 425 426 Raises 427 ------ 428 KeyError 429 If the dataset name is not registered in the catalog or validator 430 configuration. 431 FileNotFoundError 432 If the dataset's source file or location does not exist. 433 ValidationError 434 If the loaded data fails validation. The exception includes details 435 about which validation rules failed, expected values, and observed 436 values. The specific exception type depends on the validator 437 implementation (e.g., great_expectations.exceptions.ValidationError 438 for GXValidator, pandera.errors.SchemaError for PanderaValidator). 439 ValueError 440 If the dataset cannot be loaded due to format errors, parsing 441 failures, or incompatible data types. 442 TypeError 443 If the loaded data type is incompatible with the validator 444 expectations. 445 PermissionError 446 If the dataset source is not readable due to permission issues. 447 448 See Also 449 -------- 450 save : Validate and save a dataset to the catalog. 451 in_directory : Factory method for creating catalog instances. 452 DataCatalog.load : Underlying catalog load operation. 453 DataValidator.validate : Underlying validation operation. 454 455 Notes 456 ----- 457 The load operation is idempotent: calling it multiple times with the 458 same name and arguments should return equivalent data (assuming the 459 underlying source hasn't changed). 460 461 Validation happens after loading, so the full dataset must be loaded 462 into memory before validation can begin. For very large datasets, this 463 may have performance implications. Some validator implementations 464 support sampling-based validation to mitigate this. 465 466 If validation fails, the loaded data is discarded and not returned. 467 This prevents invalid data from propagating through your pipeline. 468 469 The method does not cache loaded data. Each call performs a fresh 470 load and validation. If caching is needed, implement it at a higher 471 level or use a catalog implementation that supports caching. 472 473 Thread safety depends on the underlying catalog and validator 474 implementations. Consult their documentation if concurrent loading 475 is required. 476 477 Examples 478 -------- 479 Basic usage: 480 481 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 482 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 483 >>> df = catalog.load("customer_data") 484 >>> # df is guaranteed to meet all validation rules for "customer_data" 485 >>> print(df.columns) 486 Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object') 487 488 Loading with additional arguments: 489 490 >>> # Load a specific version 491 >>> df_v1 = catalog.load("customer_data", version="2024-01-01") 492 >>> 493 >>> # Override load arguments 494 >>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]}) 495 496 Handling validation failures: 497 498 >>> try: 499 ... df = catalog.load("strict_dataset") 500 ... except Exception as e: 501 ... print(f"Validation failed: {e}") 502 ... # Log the failure, send alert, or handle gracefully 503 ... # The invalid data is not returned 504 505 Loading multiple datasets in a pipeline: 506 507 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 508 >>> 509 >>> # All datasets are validated on load 510 >>> customers = catalog.load("customers") 511 >>> orders = catalog.load("orders") 512 >>> products = catalog.load("products") 513 >>> 514 >>> # Merge validated datasets with confidence 515 >>> enriched = orders.merge(customers, on="customer_id") 516 >>> enriched = enriched.merge(products, on="product_id") 517 518 Using in a data processing function: 519 520 >>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame: 521 ... # Load and validate raw sales data 522 ... sales = catalog.load("raw_sales") 523 ... 524 ... # Process with confidence that data meets expectations 525 ... sales["revenue"] = sales["quantity"] * sales["price"] 526 ... sales = sales.groupby("region").agg({"revenue": "sum"}) 527 ... 528 ... return sales 529 530 Comparing data across environments: 531 532 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 533 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 534 >>> 535 >>> # Same dataset name, different sources, same validation 536 >>> dev_data = dev_catalog.load("training_data") 537 >>> prod_data = prod_catalog.load("training_data") 538 >>> 539 >>> # Both are guaranteed to have the same schema and quality 540 """ 541 return self.validator.validate(name, self.catalog.load(name, **kwargs))
Load a dataset from the catalog and validate it.
This method performs a two-step operation:
- Load the dataset using the underlying catalog's load() method
- Validate the loaded dataset using the validator's validate() method
The validation step ensures that the loaded data meets all configured quality expectations before being returned to the caller. If validation fails, an exception is raised and no data is returned.
This provides a safety guarantee: any data returned from load() has been validated and can be trusted to meet the configured expectations.
Parameters
- name (str): The registered name of the dataset to load. This name must be defined in both the catalog configuration (for loading) and the validator configuration (for validation rules). The name serves as the lookup key for both components.
- **kwargs (Any):
Additional keyword arguments passed through to the underlying
catalog's load() method. The supported arguments depend on the
catalog implementation. Common examples include:
- version: str - Load a specific version of the dataset
- load_args: dict - Override default load arguments
- credentials: dict - Override default credentials Consult your catalog implementation's documentation for details.
Returns
- Data: The loaded and validated dataset. The specific type depends on the catalog configuration (e.g., pandas.DataFrame, pyspark.sql.DataFrame). The returned data has passed all validation checks configured for this dataset name.
Raises
- KeyError: If the dataset name is not registered in the catalog or validator configuration.
- FileNotFoundError: If the dataset's source file or location does not exist.
- ValidationError: If the loaded data fails validation. The exception includes details about which validation rules failed, expected values, and observed values. The specific exception type depends on the validator implementation (e.g., great_expectations.exceptions.ValidationError for GXValidator, pandera.errors.SchemaError for PanderaValidator).
- ValueError: If the dataset cannot be loaded due to format errors, parsing failures, or incompatible data types.
- TypeError: If the loaded data type is incompatible with the validator expectations.
- PermissionError: If the dataset source is not readable due to permission issues.
See Also
save: Validate and save a dataset to the catalog.
in_directory: Factory method for creating catalog instances.
DataCatalog.load: Underlying catalog load operation.
DataValidator.validate: Underlying validation operation.
Notes
The load operation is idempotent: calling it multiple times with the same name and arguments should return equivalent data (assuming the underlying source hasn't changed).
Validation happens after loading, so the full dataset must be loaded into memory before validation can begin. For very large datasets, this may have performance implications. Some validator implementations support sampling-based validation to mitigate this.
If validation fails, the loaded data is discarded and not returned. This prevents invalid data from propagating through your pipeline.
The method does not cache loaded data. Each call performs a fresh load and validation. If caching is needed, implement it at a higher level or use a catalog implementation that supports caching.
Thread safety depends on the underlying catalog and validator implementations. Consult their documentation if concurrent loading is required.
Examples
Basic usage:
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> df = catalog.load("customer_data")
>>> # df is guaranteed to meet all validation rules for "customer_data"
>>> print(df.columns)
Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
Loading with additional arguments:
>>> # Load a specific version
>>> df_v1 = catalog.load("customer_data", version="2024-01-01")
>>>
>>> # Override load arguments
>>> df_custom = catalog.load("sales_data", load_args={"parse_dates": ["transaction_date"]})
Handling validation failures:
>>> try:
... df = catalog.load("strict_dataset")
... except Exception as e:
... print(f"Validation failed: {e}")
... # Log the failure, send alert, or handle gracefully
... # The invalid data is not returned
Loading multiple datasets in a pipeline:
>>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
>>>
>>> # All datasets are validated on load
>>> customers = catalog.load("customers")
>>> orders = catalog.load("orders")
>>> products = catalog.load("products")
>>>
>>> # Merge validated datasets with confidence
>>> enriched = orders.merge(customers, on="customer_id")
>>> enriched = enriched.merge(products, on="product_id")
Using in a data processing function:
>>> def process_sales_data(catalog: ValidatedDataCatalog) -> pd.DataFrame:
... # Load and validate raw sales data
... sales = catalog.load("raw_sales")
...
... # Process with confidence that data meets expectations
... sales["revenue"] = sales["quantity"] * sales["price"]
... sales = sales.groupby("region").agg({"revenue": "sum"})
...
... return sales
Comparing data across environments:
>>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
>>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
>>>
>>> # Same dataset name, different sources, same validation
>>> dev_data = dev_catalog.load("training_data")
>>> prod_data = prod_catalog.load("training_data")
>>>
>>> # Both are guaranteed to have the same schema and quality
543 def save(self, name: str, data: Data) -> None: 544 """ 545 Validate a dataset and save it to the catalog. 546 547 This method performs a two-step operation: 548 1. Validate the dataset using the validator's validate() method 549 2. Save the validated dataset using the catalog's save() method 550 551 The validation step ensures that only data meeting all configured 552 quality expectations is persisted. If validation fails, an exception 553 is raised and no data is saved. 554 555 This provides a safety guarantee: any data saved via this method has 556 been validated and meets the configured expectations. Invalid data 557 is prevented from entering downstream systems or storage. 558 559 Parameters 560 ---------- 561 name : str 562 The registered name of the dataset to save. This name must be 563 defined in both the catalog configuration (for saving location 564 and format) and the validator configuration (for validation rules). 565 The name serves as the lookup key for both components. 566 data : Data 567 The dataset to validate and save. Must be a Data protocol-compatible 568 object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has 569 'columns' and 'dtypes' properties. The data must satisfy all 570 validation rules configured for this dataset name. 571 572 Returns 573 ------- 574 None 575 This method does not return a value. It performs a side effect 576 (saving data) after successful validation. 577 578 Raises 579 ------ 580 KeyError 581 If the dataset name is not registered in the catalog or validator 582 configuration. 583 ValidationError 584 If the data fails validation. The exception includes details about 585 which validation rules failed, expected values, and observed values. 586 No data is saved when validation fails. The specific exception type 587 depends on the validator implementation (e.g., 588 great_expectations.exceptions.ValidationError for GXValidator, 589 pandera.errors.SchemaError for PanderaValidator). 590 TypeError 591 If the data type is incompatible with the catalog's save operation 592 or the validator's expectations. 593 ValueError 594 If the data cannot be saved due to format errors or serialization 595 failures. 596 PermissionError 597 If the target save location is not writable. 598 OSError 599 If there are filesystem errors during the save operation (e.g., 600 disk full, path too long). 601 602 See Also 603 -------- 604 load : Load and validate a dataset from the catalog. 605 in_directory : Factory method for creating catalog instances. 606 DataCatalog.save : Underlying catalog save operation. 607 DataValidator.validate : Underlying validation operation. 608 609 Notes 610 ----- 611 Validation happens before saving, so invalid data is never persisted. 612 This is crucial for maintaining data quality in downstream systems. 613 614 The save operation should be atomic when possible: either the entire 615 dataset is saved successfully, or no partial data is written. Atomicity 616 depends on the catalog implementation and underlying storage system. 617 618 Some catalog implementations support versioning, automatically creating 619 timestamped or numbered versions of saved datasets. Consult your catalog 620 documentation for details. 621 622 If the target file or location already exists, the behavior depends on 623 the catalog configuration. Common options include: 624 - Overwrite: Replace existing data (default for most catalogs) 625 - Append: Add to existing data 626 - Error: Raise an exception if target exists 627 - Version: Create a new version without overwriting 628 629 For very large datasets, validation may have performance implications 630 as the entire dataset must be validated before saving begins. Some 631 validator implementations support sampling-based validation. 632 633 Thread safety depends on the underlying catalog and validator 634 implementations. Consult their documentation if concurrent saving 635 is required. 636 637 The method does not modify the input data object. Validation may 638 internally create temporary copies or views, but the original data 639 parameter is unchanged. 640 641 Examples 642 -------- 643 Basic usage: 644 645 >>> import pandas as pd 646 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 647 >>> 648 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 649 >>> df = pd.DataFrame( 650 ... { 651 ... "customer_id": [1, 2, 3], 652 ... "name": ["Alice", "Bob", "Carol"], 653 ... "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"], 654 ... } 655 ... ) 656 >>> catalog.save("customer_data", df) 657 >>> # Data is validated before saving; only valid data is persisted 658 659 Handling validation failures: 660 661 >>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]}) 662 >>> try: 663 ... catalog.save("customer_data", invalid_df) 664 ... except Exception as e: 665 ... print(f"Validation failed: {e}") 666 ... # Invalid data is not saved; downstream systems protected 667 668 Saving multiple datasets in a pipeline: 669 670 >>> catalog = ValidatedDataCatalog.in_directory("config/pipeline") 671 >>> 672 >>> # Process and save intermediate results 673 >>> raw = catalog.load("raw_transactions") 674 >>> cleaned = raw.dropna() 675 >>> catalog.save("cleaned_transactions", cleaned) 676 >>> 677 >>> # Further processing 678 >>> features = engineer_features(cleaned) 679 >>> catalog.save("feature_matrix", features) 680 >>> 681 >>> # Final output 682 >>> predictions = model.predict(features) 683 >>> catalog.save("predictions", predictions) 684 685 Complete validation workflow: 686 687 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 688 >>> 689 >>> catalog = ValidatedDataCatalog.in_directory("config/production") 690 >>> 691 >>> # Load validated input 692 >>> input_data = catalog.load("input_dataset") 693 >>> 694 >>> # Transform data 695 >>> output_data = transform(input_data) 696 >>> 697 >>> # Save with validation 698 >>> try: 699 ... catalog.save("output_dataset", output_data) 700 ... except Exception as e: 701 ... # Validation failed; investigate and fix transformation 702 ... logger.error(f"Output validation failed: {e}") 703 ... # Original input_data is still available for debugging 704 ... raise 705 706 Using save in a reusable processing function: 707 708 >>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None: 709 ... # Load validated data 710 ... sales = catalog.load(input_name) 711 ... 712 ... # Aggregate 713 ... aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"}) 714 ... 715 ... # Save validated output 716 ... catalog.save(output_name, aggregated) 717 >>> 718 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 719 >>> aggregate_sales(catalog, "daily_sales", "monthly_sales") 720 721 Preventing invalid data from reaching production: 722 723 >>> # Development environment - experimenting with new features 724 >>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev") 725 >>> experimental_df = create_new_features(raw_data) 726 >>> 727 >>> try: 728 ... dev_catalog.save("feature_matrix", experimental_df) 729 ... except Exception as e: 730 ... print(f"New features don't meet schema: {e}") 731 ... # Fix the feature engineering before deploying to production 732 >>> 733 >>> # Production environment - same validation rules enforced 734 >>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod") 735 >>> prod_catalog.save("feature_matrix", experimental_df) 736 >>> # Only succeeds if data meets production quality standards 737 738 Saving with different formats via catalog configuration: 739 740 >>> # Catalog configuration determines format (CSV, Parquet, etc.) 741 >>> catalog = ValidatedDataCatalog.in_directory("config/data") 742 >>> 743 >>> # Same save() call, different formats based on catalog config 744 >>> catalog.save("csv_output", df) # Saved as CSV 745 >>> catalog.save("parquet_output", df) # Saved as Parquet 746 >>> catalog.save("database_table", df) # Saved to database 747 >>> # All validated before saving regardless of format 748 """ 749 self.catalog.save(name, self.validator.validate(name, data))
Validate a dataset and save it to the catalog.
This method performs a two-step operation:
- Validate the dataset using the validator's validate() method
- Save the validated dataset using the catalog's save() method
The validation step ensures that only data meeting all configured quality expectations is persisted. If validation fails, an exception is raised and no data is saved.
This provides a safety guarantee: any data saved via this method has been validated and meets the configured expectations. Invalid data is prevented from entering downstream systems or storage.
Parameters
- name (str): The registered name of the dataset to save. This name must be defined in both the catalog configuration (for saving location and format) and the validator configuration (for validation rules). The name serves as the lookup key for both components.
- data (Data): The dataset to validate and save. Must be a Data protocol-compatible object (e.g., pandas.DataFrame, pyspark.sql.DataFrame) that has 'columns' and 'dtypes' properties. The data must satisfy all validation rules configured for this dataset name.
Returns
- None: This method does not return a value. It performs a side effect (saving data) after successful validation.
Raises
- KeyError: If the dataset name is not registered in the catalog or validator configuration.
- ValidationError: If the data fails validation. The exception includes details about which validation rules failed, expected values, and observed values. No data is saved when validation fails. The specific exception type depends on the validator implementation (e.g., great_expectations.exceptions.ValidationError for GXValidator, pandera.errors.SchemaError for PanderaValidator).
- TypeError: If the data type is incompatible with the catalog's save operation or the validator's expectations.
- ValueError: If the data cannot be saved due to format errors or serialization failures.
- PermissionError: If the target save location is not writable.
- OSError: If there are filesystem errors during the save operation (e.g., disk full, path too long).
See Also
load: Load and validate a dataset from the catalog.
in_directory: Factory method for creating catalog instances.
DataCatalog.save: Underlying catalog save operation.
DataValidator.validate: Underlying validation operation.
Notes
Validation happens before saving, so invalid data is never persisted. This is crucial for maintaining data quality in downstream systems.
The save operation should be atomic when possible: either the entire dataset is saved successfully, or no partial data is written. Atomicity depends on the catalog implementation and underlying storage system.
Some catalog implementations support versioning, automatically creating timestamped or numbered versions of saved datasets. Consult your catalog documentation for details.
If the target file or location already exists, the behavior depends on the catalog configuration. Common options include:
- Overwrite: Replace existing data (default for most catalogs)
- Append: Add to existing data
- Error: Raise an exception if target exists
- Version: Create a new version without overwriting
For very large datasets, validation may have performance implications as the entire dataset must be validated before saving begins. Some validator implementations support sampling-based validation.
Thread safety depends on the underlying catalog and validator implementations. Consult their documentation if concurrent saving is required.
The method does not modify the input data object. Validation may internally create temporary copies or views, but the original data parameter is unchanged.
Examples
Basic usage:
>>> import pandas as pd
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> df = pd.DataFrame(
... {
... "customer_id": [1, 2, 3],
... "name": ["Alice", "Bob", "Carol"],
... "email": ["alice@ex.com", "bob@ex.com", "carol@ex.com"],
... }
... )
>>> catalog.save("customer_data", df)
>>> # Data is validated before saving; only valid data is persisted
Handling validation failures:
>>> invalid_df = pd.DataFrame({"wrong_column": [1, 2, 3]})
>>> try:
... catalog.save("customer_data", invalid_df)
... except Exception as e:
... print(f"Validation failed: {e}")
... # Invalid data is not saved; downstream systems protected
Saving multiple datasets in a pipeline:
>>> catalog = ValidatedDataCatalog.in_directory("config/pipeline")
>>>
>>> # Process and save intermediate results
>>> raw = catalog.load("raw_transactions")
>>> cleaned = raw.dropna()
>>> catalog.save("cleaned_transactions", cleaned)
>>>
>>> # Further processing
>>> features = engineer_features(cleaned)
>>> catalog.save("feature_matrix", features)
>>>
>>> # Final output
>>> predictions = model.predict(features)
>>> catalog.save("predictions", predictions)
Complete validation workflow:
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>>
>>> catalog = ValidatedDataCatalog.in_directory("config/production")
>>>
>>> # Load validated input
>>> input_data = catalog.load("input_dataset")
>>>
>>> # Transform data
>>> output_data = transform(input_data)
>>>
>>> # Save with validation
>>> try:
... catalog.save("output_dataset", output_data)
... except Exception as e:
... # Validation failed; investigate and fix transformation
... logger.error(f"Output validation failed: {e}")
... # Original input_data is still available for debugging
... raise
Using save in a reusable processing function:
>>> def aggregate_sales(catalog: ValidatedDataCatalog, input_name: str, output_name: str) -> None:
... # Load validated data
... sales = catalog.load(input_name)
...
... # Aggregate
... aggregated = sales.groupby("region").agg({"revenue": "sum", "quantity": "sum"})
...
... # Save validated output
... catalog.save(output_name, aggregated)
>>>
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>> aggregate_sales(catalog, "daily_sales", "monthly_sales")
Preventing invalid data from reaching production:
>>> # Development environment - experimenting with new features
>>> dev_catalog = ValidatedDataCatalog.in_directory("config/dev")
>>> experimental_df = create_new_features(raw_data)
>>>
>>> try:
... dev_catalog.save("feature_matrix", experimental_df)
... except Exception as e:
... print(f"New features don't meet schema: {e}")
... # Fix the feature engineering before deploying to production
>>>
>>> # Production environment - same validation rules enforced
>>> prod_catalog = ValidatedDataCatalog.in_directory("config/prod")
>>> prod_catalog.save("feature_matrix", experimental_df)
>>> # Only succeeds if data meets production quality standards
Saving with different formats via catalog configuration:
>>> # Catalog configuration determines format (CSV, Parquet, etc.)
>>> catalog = ValidatedDataCatalog.in_directory("config/data")
>>>
>>> # Same save() call, different formats based on catalog config
>>> catalog.save("csv_output", df) # Saved as CSV
>>> catalog.save("parquet_output", df) # Saved as Parquet
>>> catalog.save("database_table", df) # Saved to database
>>> # All validated before saving regardless of format