adc_toolkit.data.abs
Protocol definitions for the data module.
This module defines the foundational protocols used throughout the adc-toolkit data handling system. These protocols establish contracts for data objects, data catalogs, and data validators, enabling flexible implementations while maintaining type safety and consistent interfaces.
The protocols support dependency injection and the strategy pattern, allowing users to swap implementations (e.g., Kedro vs. custom catalogs, GX vs. Pandera validators) without changing downstream code.
Examples
Implementing a custom data object:
>>> class MyDataFrame:
... def __init__(self, data):
... self._data = data
...
... @property
... def columns(self):
... return self._data.columns
...
... @property
... def dtypes(self):
... return self._data.dtypes
Using the protocols for type hints:
>>> def process_data(catalog: DataCatalog, validator: DataValidator) -> None:
... data = catalog.load("my_dataset")
... validated = validator.validate("my_dataset", data)
... catalog.save("processed_dataset", validated)
1""" 2Protocol definitions for the data module. 3 4This module defines the foundational protocols used throughout the adc-toolkit 5data handling system. These protocols establish contracts for data objects, 6data catalogs, and data validators, enabling flexible implementations while 7maintaining type safety and consistent interfaces. 8 9The protocols support dependency injection and the strategy pattern, allowing 10users to swap implementations (e.g., Kedro vs. custom catalogs, GX vs. Pandera 11validators) without changing downstream code. 12 13Examples 14-------- 15Implementing a custom data object: 16 17>>> class MyDataFrame: 18... def __init__(self, data): 19... self._data = data 20... 21... @property 22... def columns(self): 23... return self._data.columns 24... 25... @property 26... def dtypes(self): 27... return self._data.dtypes 28 29Using the protocols for type hints: 30 31>>> def process_data(catalog: DataCatalog, validator: DataValidator) -> None: 32... data = catalog.load("my_dataset") 33... validated = validator.validate("my_dataset", data) 34... catalog.save("processed_dataset", validated) 35""" 36 37from pathlib import Path 38from typing import Protocol 39 40 41class Data(Protocol): 42 """ 43 Protocol for data objects in the toolkit. 44 45 This protocol defines the minimal interface that any data object must 46 implement to be compatible with the adc-toolkit data handling system. 47 Data objects represent structured datasets such as pandas DataFrames, 48 Spark DataFrames, or other tabular data structures. 49 50 The protocol requires column metadata and data type information, enabling 51 validators and catalogs to inspect data structure without depending on 52 specific implementations. 53 54 Attributes 55 ---------- 56 columns : property 57 Property that returns the column names or labels of the dataset. 58 The exact return type depends on the implementation (e.g., 59 pandas.Index for pandas DataFrames, list of strings for Spark). 60 dtypes : property 61 Property that returns the data types of each column in the dataset. 62 The exact return type depends on the implementation (e.g., 63 pandas.Series for pandas DataFrames, StructType for Spark). 64 65 See Also 66 -------- 67 DataCatalog : Protocol for loading and saving data objects. 68 DataValidator : Protocol for validating data objects. 69 70 Notes 71 ----- 72 This is a Protocol class, not an abstract base class. Classes do not need 73 to explicitly inherit from Data to be considered compatible. Any class 74 that implements the required attributes will satisfy this protocol through 75 structural subtyping (PEP 544). 76 77 Common implementations include: 78 - pandas.DataFrame: Provides columns and dtypes properties 79 - pyspark.sql.DataFrame: Provides columns and dtypes properties 80 - Custom data containers with appropriate metadata 81 82 Examples 83 -------- 84 A pandas DataFrame naturally satisfies this protocol: 85 86 >>> import pandas as pd 87 >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) 88 >>> df.columns 89 Index(['a', 'b'], dtype='object') 90 >>> df.dtypes 91 a int64 92 b int64 93 dtype: object 94 95 A custom class implementing the protocol: 96 97 >>> class CustomData: 98 ... def __init__(self, col_names, col_types): 99 ... self._columns = col_names 100 ... self._dtypes = col_types 101 ... 102 ... @property 103 ... def columns(self): 104 ... return self._columns 105 ... 106 ... @property 107 ... def dtypes(self): 108 ... return self._dtypes 109 >>> data = CustomData(["x", "y"], ["int", "float"]) 110 >>> data.columns 111 ['x', 'y'] 112 """ 113 114 columns: property 115 dtypes: property 116 117 118class DataCatalog(Protocol): 119 """ 120 Protocol for data catalog implementations. 121 122 This protocol defines the interface for data catalogs, which handle loading 123 and saving datasets. Data catalogs abstract away the details of data storage, 124 file formats, and I/O operations, providing a simple name-based API for data 125 access. 126 127 Implementations typically use configuration files (e.g., YAML) to map dataset 128 names to storage locations, file formats, and load/save parameters. This 129 enables declarative data management and facilitates reproducible data 130 pipelines. 131 132 Methods 133 ------- 134 in_directory(path) 135 Create a new catalog instance from configuration in a directory. 136 load(name) 137 Load a dataset by name from the catalog. 138 save(name, data) 139 Save a dataset by name to the catalog. 140 141 See Also 142 -------- 143 Data : Protocol for data objects handled by the catalog. 144 DataValidator : Protocol for validating data from catalogs. 145 adc_toolkit.data.catalogs.kedro.KedroDataCatalog : Kedro-based implementation. 146 147 Notes 148 ----- 149 This is a Protocol class using structural subtyping (PEP 544). Implementations 150 do not need to explicitly inherit from DataCatalog but must provide all 151 required methods with compatible signatures. 152 153 The catalog pattern provides several benefits: 154 - Separation of concerns: data access logic separate from business logic 155 - Configuration-driven: datasets defined in config files, not hardcoded 156 - Testability: easy to mock or swap catalogs for testing 157 - Reproducibility: consistent data loading across environments 158 159 Thread safety and caching behavior are implementation-specific and should 160 be documented in concrete implementations. 161 162 Examples 163 -------- 164 Using a catalog to load and save data: 165 166 >>> catalog = SomeCatalog.in_directory("path/to/config") 167 >>> df = catalog.load("training_data") 168 >>> processed_df = preprocess(df) 169 >>> catalog.save("processed_data", processed_df) 170 171 Catalogs enable clean separation between data access and processing: 172 173 >>> def pipeline(catalog: DataCatalog) -> None: 174 ... raw = catalog.load("raw_data") 175 ... cleaned = clean_data(raw) 176 ... catalog.save("cleaned_data", cleaned) 177 ... features = engineer_features(cleaned) 178 ... catalog.save("features", features) 179 """ 180 181 @classmethod 182 def in_directory(cls, path: str | Path) -> "DataCatalog": 183 """ 184 Create a catalog instance from configuration in a directory. 185 186 This factory method instantiates a catalog by reading configuration 187 files from the specified directory. The configuration typically defines 188 dataset names, file paths, formats, and load/save parameters. 189 190 Parameters 191 ---------- 192 path : str or pathlib.Path 193 Path to the directory containing catalog configuration files. 194 The directory should contain YAML or other configuration files 195 that define the datasets available in this catalog. 196 197 Returns 198 ------- 199 DataCatalog 200 A new catalog instance configured with datasets from the directory. 201 202 Raises 203 ------ 204 FileNotFoundError 205 If the specified directory does not exist. 206 ValueError 207 If the configuration files are invalid or cannot be parsed. 208 209 See Also 210 -------- 211 load : Load a dataset from the catalog. 212 save : Save a dataset to the catalog. 213 214 Notes 215 ----- 216 The exact configuration file format and structure depend on the 217 implementation. For example, Kedro-based catalogs expect a 218 `catalog.yml` file with dataset definitions. 219 220 Configuration files should not be committed with credentials or 221 sensitive information. Use environment variables or separate 222 credential files. 223 224 Examples 225 -------- 226 Create a catalog from a configuration directory: 227 228 >>> catalog = MyCatalog.in_directory("/path/to/config") 229 >>> catalog.load("my_dataset") 230 <Data object> 231 232 Using pathlib.Path: 233 234 >>> from pathlib import Path 235 >>> config_dir = Path("configs") / "production" 236 >>> catalog = MyCatalog.in_directory(config_dir) 237 """ 238 ... 239 240 def load(self, name: str) -> Data: 241 """ 242 Load a dataset by name from the catalog. 243 244 Retrieve a dataset using its registered name. The catalog handles 245 all I/O operations, file format parsing, and type conversions based 246 on the configuration for this dataset. 247 248 Parameters 249 ---------- 250 name : str 251 The registered name of the dataset to load. This name should 252 match a dataset definition in the catalog's configuration. 253 254 Returns 255 ------- 256 Data 257 The loaded dataset as a Data protocol-compatible object. The 258 specific type depends on the catalog configuration (e.g., 259 pandas DataFrame, Spark DataFrame). 260 261 Raises 262 ------ 263 KeyError 264 If no dataset with the given name is registered in the catalog. 265 FileNotFoundError 266 If the dataset's source file does not exist. 267 ValueError 268 If the dataset cannot be loaded due to format or parsing errors. 269 270 See Also 271 -------- 272 save : Save a dataset to the catalog. 273 in_directory : Create a catalog from configuration. 274 275 Notes 276 ----- 277 The load operation may involve: 278 - Reading from local files, cloud storage, or databases 279 - Parsing specific file formats (CSV, Parquet, JSON, etc.) 280 - Applying transformations defined in the catalog configuration 281 - Caching for performance (implementation-dependent) 282 283 Load operations should be idempotent: calling load multiple times 284 with the same name should return equivalent data. 285 286 Examples 287 -------- 288 Load a dataset by name: 289 290 >>> catalog = MyCatalog.in_directory("config/") 291 >>> df = catalog.load("customer_data") 292 >>> df.columns 293 Index(['customer_id', 'name', 'email'], dtype='object') 294 295 Load multiple datasets: 296 297 >>> train = catalog.load("training_data") 298 >>> test = catalog.load("test_data") 299 >>> model = catalog.load("trained_model") 300 """ 301 ... 302 303 def save(self, name: str, data: Data) -> None: 304 """ 305 Save a dataset by name to the catalog. 306 307 Store a dataset using its registered name. The catalog handles all 308 I/O operations, file format serialization, and storage operations 309 based on the configuration for this dataset. 310 311 Parameters 312 ---------- 313 name : str 314 The registered name of the dataset to save. This name should 315 match a dataset definition in the catalog's configuration. 316 data : Data 317 The dataset to save. Must be a Data protocol-compatible object 318 (e.g., pandas DataFrame, Spark DataFrame). 319 320 Returns 321 ------- 322 None 323 324 Raises 325 ------ 326 KeyError 327 If no dataset with the given name is registered in the catalog. 328 TypeError 329 If the data type is incompatible with the dataset configuration. 330 ValueError 331 If the dataset cannot be saved due to validation or format errors. 332 PermissionError 333 If the target location is not writable. 334 335 See Also 336 -------- 337 load : Load a dataset from the catalog. 338 in_directory : Create a catalog from configuration. 339 340 Notes 341 ----- 342 The save operation may involve: 343 - Writing to local files, cloud storage, or databases 344 - Serializing to specific file formats (CSV, Parquet, JSON, etc.) 345 - Creating directories if they don't exist 346 - Overwriting existing files (configuration-dependent) 347 - Applying transformations before saving 348 349 Save operations should be atomic when possible: either the entire 350 dataset is saved successfully, or no partial data is written. 351 352 Some implementations may support versioning, creating timestamped 353 or numbered versions of saved datasets. 354 355 Examples 356 -------- 357 Save a processed dataset: 358 359 >>> catalog = MyCatalog.in_directory("config/") 360 >>> processed_df = process_data(raw_df) 361 >>> catalog.save("processed_data", processed_df) 362 363 Save multiple datasets in a pipeline: 364 365 >>> catalog.save("cleaned_data", cleaned) 366 >>> catalog.save("features", features) 367 >>> catalog.save("predictions", predictions) 368 """ 369 ... 370 371 372class DataValidator(Protocol): 373 """ 374 Protocol for data validator implementations. 375 376 This protocol defines the interface for data validators, which verify that 377 datasets meet specified quality, schema, and business rule requirements. 378 Validators execute validation rules (expectations, schemas, constraints) 379 and either return validated data or raise exceptions on validation failures. 380 381 Implementations typically use configuration files to define validation 382 rules separate from code. This enables declarative data validation and 383 facilitates maintaining data quality in production pipelines. 384 385 Methods 386 ------- 387 validate(name, data) 388 Validate a dataset against configured validation rules. 389 in_directory(path) 390 Create a new validator instance from configuration in a directory. 391 392 See Also 393 -------- 394 Data : Protocol for data objects being validated. 395 DataCatalog : Protocol for loading data to validate. 396 adc_toolkit.data.validators.gx.GXValidator : Great Expectations implementation. 397 adc_toolkit.data.validators.pandera.PanderaValidator : Pandera implementation. 398 adc_toolkit.data.validators.no_validator.NoValidator : No-op implementation. 399 400 Notes 401 ----- 402 This is a Protocol class using structural subtyping (PEP 544). Implementations 403 do not need to explicitly inherit from DataValidator but must provide all 404 required methods with compatible signatures. 405 406 Validators serve multiple purposes: 407 - Data quality assurance: catch schema drift and data corruption early 408 - Contract enforcement: ensure data meets expectations between pipeline stages 409 - Documentation: validation rules document expected data characteristics 410 - Monitoring: track validation results over time to detect degradation 411 412 Different implementations offer different trade-offs: 413 - Great Expectations: Rich ecosystem, profiling, data docs, cloud support 414 - Pandera: Lightweight, tight pandas integration, statistical validation 415 - NoValidator: No validation overhead for trusted data sources 416 417 Validation can be expensive on large datasets. Implementations may support 418 sampling or lazy validation strategies. 419 420 Examples 421 -------- 422 Using a validator in a data pipeline: 423 424 >>> validator = SomeValidator.in_directory("config/validations") 425 >>> raw_data = load_data() 426 >>> validated_data = validator.validate("raw_data", raw_data) 427 428 Combining validators with catalogs: 429 430 >>> catalog = MyCatalog.in_directory("config/") 431 >>> validator = MyValidator.in_directory("config/validations") 432 >>> data = catalog.load("customer_data") 433 >>> validated = validator.validate("customer_data", data) 434 >>> catalog.save("validated_customer_data", validated) 435 """ 436 437 def validate(self, name: str, data: Data) -> Data: 438 """ 439 Validate a dataset against configured validation rules. 440 441 Execute all validation rules associated with the named dataset. If 442 validation succeeds, return the data (potentially with validation 443 metadata attached). If validation fails, raise an exception with 444 details about which rules failed. 445 446 Parameters 447 ---------- 448 name : str 449 The name identifying which validation rules to apply. This should 450 correspond to a validation configuration (expectation suite, schema, 451 etc.) defined in the validator's configuration. 452 data : Data 453 The dataset to validate. Must be a Data protocol-compatible object 454 (e.g., pandas DataFrame, Spark DataFrame). 455 456 Returns 457 ------- 458 Data 459 The validated dataset. This is typically the same object as the 460 input data parameter, but implementations may attach validation 461 metadata or perform transformations during validation. 462 463 Raises 464 ------ 465 KeyError 466 If no validation rules are configured for the given name. 467 ValidationError 468 If the data fails validation. The exception should include details 469 about which validation rules failed and the observed values. 470 TypeError 471 If the data type is incompatible with the validation rules. 472 473 See Also 474 -------- 475 in_directory : Create a validator from configuration. 476 477 Notes 478 ----- 479 Validation typically checks: 480 - Schema: column names, data types, nullability 481 - Constraints: value ranges, uniqueness, referential integrity 482 - Statistical properties: distributions, correlations, outliers 483 - Business rules: domain-specific requirements 484 485 The behavior on validation failure is implementation-specific: 486 - Some validators raise immediately on first failure 487 - Others collect all failures and raise with complete results 488 - Some support warning-level validations that log but don't raise 489 490 Validation may modify data in some implementations: 491 - Type coercion to match schema 492 - Null filling or imputation 493 - Outlier capping or filtering 494 495 For large datasets, implementations may support sampling-based 496 validation to reduce computational cost while maintaining statistical 497 confidence. 498 499 Examples 500 -------- 501 Validate a dataset: 502 503 >>> validator = MyValidator.in_directory("config/validations") 504 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 505 >>> validated = validator.validate("my_dataset", df) 506 507 Handle validation failures: 508 509 >>> try: 510 ... validated = validator.validate("strict_dataset", df) 511 ... except ValidationError as e: 512 ... print(f"Validation failed: {e}") 513 ... # Log failure, send alert, or handle gracefully 514 515 Use in a data pipeline: 516 517 >>> def pipeline(validator: DataValidator) -> None: 518 ... raw = load_raw_data() 519 ... validated_raw = validator.validate("raw_schema", raw) 520 ... processed = process(validated_raw) 521 ... validated_processed = validator.validate("processed_schema", processed) 522 ... save_results(validated_processed) 523 """ 524 ... 525 526 @classmethod 527 def in_directory(cls, path: str | Path) -> "DataValidator": 528 """ 529 Create a validator instance from configuration in a directory. 530 531 This factory method instantiates a validator by reading validation 532 configurations from the specified directory. The configuration defines 533 validation rules (expectations, schemas, constraints) for named datasets. 534 535 Parameters 536 ---------- 537 path : str or pathlib.Path 538 Path to the directory containing validator configuration files. 539 The directory should contain validation rule definitions in a 540 format appropriate for the implementation (e.g., Great Expectations 541 checkpoints, Pandera schemas). 542 543 Returns 544 ------- 545 DataValidator 546 A new validator instance configured with rules from the directory. 547 548 Raises 549 ------ 550 FileNotFoundError 551 If the specified directory does not exist. 552 ValueError 553 If the configuration files are invalid or cannot be parsed. 554 555 See Also 556 -------- 557 validate : Validate a dataset using this validator. 558 559 Notes 560 ----- 561 The exact configuration file format and structure depend on the 562 implementation: 563 - GXValidator expects Great Expectations project structure 564 (expectations/, checkpoints/, great_expectations.yml) 565 - PanderaValidator expects Python files defining Pandera schemas 566 - Custom validators may use JSON, YAML, or other formats 567 568 Configuration should be version controlled to track changes to 569 validation rules over time. 570 571 Some implementations support multiple configuration directories, 572 allowing validation rules to be composed from multiple sources. 573 574 Examples 575 -------- 576 Create a validator from a configuration directory: 577 578 >>> validator = MyValidator.in_directory("/path/to/validations") 579 >>> validator.validate("dataset_name", data) 580 <validated Data object> 581 582 Using pathlib.Path: 583 584 >>> from pathlib import Path 585 >>> validation_dir = Path("config") / "data_quality" 586 >>> validator = MyValidator.in_directory(validation_dir) 587 588 Separate validators for different environments: 589 590 >>> dev_validator = MyValidator.in_directory("config/validations/dev") 591 >>> prod_validator = MyValidator.in_directory("config/validations/prod") 592 """ 593 ...
42class Data(Protocol): 43 """ 44 Protocol for data objects in the toolkit. 45 46 This protocol defines the minimal interface that any data object must 47 implement to be compatible with the adc-toolkit data handling system. 48 Data objects represent structured datasets such as pandas DataFrames, 49 Spark DataFrames, or other tabular data structures. 50 51 The protocol requires column metadata and data type information, enabling 52 validators and catalogs to inspect data structure without depending on 53 specific implementations. 54 55 Attributes 56 ---------- 57 columns : property 58 Property that returns the column names or labels of the dataset. 59 The exact return type depends on the implementation (e.g., 60 pandas.Index for pandas DataFrames, list of strings for Spark). 61 dtypes : property 62 Property that returns the data types of each column in the dataset. 63 The exact return type depends on the implementation (e.g., 64 pandas.Series for pandas DataFrames, StructType for Spark). 65 66 See Also 67 -------- 68 DataCatalog : Protocol for loading and saving data objects. 69 DataValidator : Protocol for validating data objects. 70 71 Notes 72 ----- 73 This is a Protocol class, not an abstract base class. Classes do not need 74 to explicitly inherit from Data to be considered compatible. Any class 75 that implements the required attributes will satisfy this protocol through 76 structural subtyping (PEP 544). 77 78 Common implementations include: 79 - pandas.DataFrame: Provides columns and dtypes properties 80 - pyspark.sql.DataFrame: Provides columns and dtypes properties 81 - Custom data containers with appropriate metadata 82 83 Examples 84 -------- 85 A pandas DataFrame naturally satisfies this protocol: 86 87 >>> import pandas as pd 88 >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) 89 >>> df.columns 90 Index(['a', 'b'], dtype='object') 91 >>> df.dtypes 92 a int64 93 b int64 94 dtype: object 95 96 A custom class implementing the protocol: 97 98 >>> class CustomData: 99 ... def __init__(self, col_names, col_types): 100 ... self._columns = col_names 101 ... self._dtypes = col_types 102 ... 103 ... @property 104 ... def columns(self): 105 ... return self._columns 106 ... 107 ... @property 108 ... def dtypes(self): 109 ... return self._dtypes 110 >>> data = CustomData(["x", "y"], ["int", "float"]) 111 >>> data.columns 112 ['x', 'y'] 113 """ 114 115 columns: property 116 dtypes: property
Protocol for data objects in the toolkit.
This protocol defines the minimal interface that any data object must implement to be compatible with the adc-toolkit data handling system. Data objects represent structured datasets such as pandas DataFrames, Spark DataFrames, or other tabular data structures.
The protocol requires column metadata and data type information, enabling validators and catalogs to inspect data structure without depending on specific implementations.
Attributes
- columns (property): Property that returns the column names or labels of the dataset. The exact return type depends on the implementation (e.g., pandas.Index for pandas DataFrames, list of strings for Spark).
- dtypes (property): Property that returns the data types of each column in the dataset. The exact return type depends on the implementation (e.g., pandas.Series for pandas DataFrames, StructType for Spark).
See Also
DataCatalog: Protocol for loading and saving data objects.
DataValidator: Protocol for validating data objects.
Notes
This is a Protocol class, not an abstract base class. Classes do not need to explicitly inherit from Data to be considered compatible. Any class that implements the required attributes will satisfy this protocol through structural subtyping (PEP 544).
Common implementations include:
- pandas.DataFrame: Provides columns and dtypes properties
- pyspark.sql.DataFrame: Provides columns and dtypes properties
- Custom data containers with appropriate metadata
Examples
A pandas DataFrame naturally satisfies this protocol:
>>> import pandas as pd
>>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> df.columns
Index(['a', 'b'], dtype='object')
>>> df.dtypes
a int64
b int64
dtype: object
A custom class implementing the protocol:
>>> class CustomData:
... def __init__(self, col_names, col_types):
... self._columns = col_names
... self._dtypes = col_types
...
... @property
... def columns(self):
... return self._columns
...
... @property
... def dtypes(self):
... return self._dtypes
>>> data = CustomData(["x", "y"], ["int", "float"])
>>> data.columns
['x', 'y']
1739def _no_init_or_replace_init(self, *args, **kwargs): 1740 cls = type(self) 1741 1742 if cls._is_protocol: 1743 raise TypeError('Protocols cannot be instantiated') 1744 1745 # Already using a custom `__init__`. No need to calculate correct 1746 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1747 if cls.__init__ is not _no_init_or_replace_init: 1748 return 1749 1750 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1751 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1752 # searches for a proper new `__init__` in the MRO. The new `__init__` 1753 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1754 # instantiation of the protocol subclass will thus use the new 1755 # `__init__` and no longer call `_no_init_or_replace_init`. 1756 for base in cls.__mro__: 1757 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1758 if init is not _no_init_or_replace_init: 1759 cls.__init__ = init 1760 break 1761 else: 1762 # should not happen 1763 cls.__init__ = object.__init__ 1764 1765 cls.__init__(self, *args, **kwargs)
119class DataCatalog(Protocol): 120 """ 121 Protocol for data catalog implementations. 122 123 This protocol defines the interface for data catalogs, which handle loading 124 and saving datasets. Data catalogs abstract away the details of data storage, 125 file formats, and I/O operations, providing a simple name-based API for data 126 access. 127 128 Implementations typically use configuration files (e.g., YAML) to map dataset 129 names to storage locations, file formats, and load/save parameters. This 130 enables declarative data management and facilitates reproducible data 131 pipelines. 132 133 Methods 134 ------- 135 in_directory(path) 136 Create a new catalog instance from configuration in a directory. 137 load(name) 138 Load a dataset by name from the catalog. 139 save(name, data) 140 Save a dataset by name to the catalog. 141 142 See Also 143 -------- 144 Data : Protocol for data objects handled by the catalog. 145 DataValidator : Protocol for validating data from catalogs. 146 adc_toolkit.data.catalogs.kedro.KedroDataCatalog : Kedro-based implementation. 147 148 Notes 149 ----- 150 This is a Protocol class using structural subtyping (PEP 544). Implementations 151 do not need to explicitly inherit from DataCatalog but must provide all 152 required methods with compatible signatures. 153 154 The catalog pattern provides several benefits: 155 - Separation of concerns: data access logic separate from business logic 156 - Configuration-driven: datasets defined in config files, not hardcoded 157 - Testability: easy to mock or swap catalogs for testing 158 - Reproducibility: consistent data loading across environments 159 160 Thread safety and caching behavior are implementation-specific and should 161 be documented in concrete implementations. 162 163 Examples 164 -------- 165 Using a catalog to load and save data: 166 167 >>> catalog = SomeCatalog.in_directory("path/to/config") 168 >>> df = catalog.load("training_data") 169 >>> processed_df = preprocess(df) 170 >>> catalog.save("processed_data", processed_df) 171 172 Catalogs enable clean separation between data access and processing: 173 174 >>> def pipeline(catalog: DataCatalog) -> None: 175 ... raw = catalog.load("raw_data") 176 ... cleaned = clean_data(raw) 177 ... catalog.save("cleaned_data", cleaned) 178 ... features = engineer_features(cleaned) 179 ... catalog.save("features", features) 180 """ 181 182 @classmethod 183 def in_directory(cls, path: str | Path) -> "DataCatalog": 184 """ 185 Create a catalog instance from configuration in a directory. 186 187 This factory method instantiates a catalog by reading configuration 188 files from the specified directory. The configuration typically defines 189 dataset names, file paths, formats, and load/save parameters. 190 191 Parameters 192 ---------- 193 path : str or pathlib.Path 194 Path to the directory containing catalog configuration files. 195 The directory should contain YAML or other configuration files 196 that define the datasets available in this catalog. 197 198 Returns 199 ------- 200 DataCatalog 201 A new catalog instance configured with datasets from the directory. 202 203 Raises 204 ------ 205 FileNotFoundError 206 If the specified directory does not exist. 207 ValueError 208 If the configuration files are invalid or cannot be parsed. 209 210 See Also 211 -------- 212 load : Load a dataset from the catalog. 213 save : Save a dataset to the catalog. 214 215 Notes 216 ----- 217 The exact configuration file format and structure depend on the 218 implementation. For example, Kedro-based catalogs expect a 219 `catalog.yml` file with dataset definitions. 220 221 Configuration files should not be committed with credentials or 222 sensitive information. Use environment variables or separate 223 credential files. 224 225 Examples 226 -------- 227 Create a catalog from a configuration directory: 228 229 >>> catalog = MyCatalog.in_directory("/path/to/config") 230 >>> catalog.load("my_dataset") 231 <Data object> 232 233 Using pathlib.Path: 234 235 >>> from pathlib import Path 236 >>> config_dir = Path("configs") / "production" 237 >>> catalog = MyCatalog.in_directory(config_dir) 238 """ 239 ... 240 241 def load(self, name: str) -> Data: 242 """ 243 Load a dataset by name from the catalog. 244 245 Retrieve a dataset using its registered name. The catalog handles 246 all I/O operations, file format parsing, and type conversions based 247 on the configuration for this dataset. 248 249 Parameters 250 ---------- 251 name : str 252 The registered name of the dataset to load. This name should 253 match a dataset definition in the catalog's configuration. 254 255 Returns 256 ------- 257 Data 258 The loaded dataset as a Data protocol-compatible object. The 259 specific type depends on the catalog configuration (e.g., 260 pandas DataFrame, Spark DataFrame). 261 262 Raises 263 ------ 264 KeyError 265 If no dataset with the given name is registered in the catalog. 266 FileNotFoundError 267 If the dataset's source file does not exist. 268 ValueError 269 If the dataset cannot be loaded due to format or parsing errors. 270 271 See Also 272 -------- 273 save : Save a dataset to the catalog. 274 in_directory : Create a catalog from configuration. 275 276 Notes 277 ----- 278 The load operation may involve: 279 - Reading from local files, cloud storage, or databases 280 - Parsing specific file formats (CSV, Parquet, JSON, etc.) 281 - Applying transformations defined in the catalog configuration 282 - Caching for performance (implementation-dependent) 283 284 Load operations should be idempotent: calling load multiple times 285 with the same name should return equivalent data. 286 287 Examples 288 -------- 289 Load a dataset by name: 290 291 >>> catalog = MyCatalog.in_directory("config/") 292 >>> df = catalog.load("customer_data") 293 >>> df.columns 294 Index(['customer_id', 'name', 'email'], dtype='object') 295 296 Load multiple datasets: 297 298 >>> train = catalog.load("training_data") 299 >>> test = catalog.load("test_data") 300 >>> model = catalog.load("trained_model") 301 """ 302 ... 303 304 def save(self, name: str, data: Data) -> None: 305 """ 306 Save a dataset by name to the catalog. 307 308 Store a dataset using its registered name. The catalog handles all 309 I/O operations, file format serialization, and storage operations 310 based on the configuration for this dataset. 311 312 Parameters 313 ---------- 314 name : str 315 The registered name of the dataset to save. This name should 316 match a dataset definition in the catalog's configuration. 317 data : Data 318 The dataset to save. Must be a Data protocol-compatible object 319 (e.g., pandas DataFrame, Spark DataFrame). 320 321 Returns 322 ------- 323 None 324 325 Raises 326 ------ 327 KeyError 328 If no dataset with the given name is registered in the catalog. 329 TypeError 330 If the data type is incompatible with the dataset configuration. 331 ValueError 332 If the dataset cannot be saved due to validation or format errors. 333 PermissionError 334 If the target location is not writable. 335 336 See Also 337 -------- 338 load : Load a dataset from the catalog. 339 in_directory : Create a catalog from configuration. 340 341 Notes 342 ----- 343 The save operation may involve: 344 - Writing to local files, cloud storage, or databases 345 - Serializing to specific file formats (CSV, Parquet, JSON, etc.) 346 - Creating directories if they don't exist 347 - Overwriting existing files (configuration-dependent) 348 - Applying transformations before saving 349 350 Save operations should be atomic when possible: either the entire 351 dataset is saved successfully, or no partial data is written. 352 353 Some implementations may support versioning, creating timestamped 354 or numbered versions of saved datasets. 355 356 Examples 357 -------- 358 Save a processed dataset: 359 360 >>> catalog = MyCatalog.in_directory("config/") 361 >>> processed_df = process_data(raw_df) 362 >>> catalog.save("processed_data", processed_df) 363 364 Save multiple datasets in a pipeline: 365 366 >>> catalog.save("cleaned_data", cleaned) 367 >>> catalog.save("features", features) 368 >>> catalog.save("predictions", predictions) 369 """ 370 ...
Protocol for data catalog implementations.
This protocol defines the interface for data catalogs, which handle loading and saving datasets. Data catalogs abstract away the details of data storage, file formats, and I/O operations, providing a simple name-based API for data access.
Implementations typically use configuration files (e.g., YAML) to map dataset names to storage locations, file formats, and load/save parameters. This enables declarative data management and facilitates reproducible data pipelines.
Methods
in_directory(path) Create a new catalog instance from configuration in a directory. load(name) Load a dataset by name from the catalog. save(name, data) Save a dataset by name to the catalog.
See Also
Data: Protocol for data objects handled by the catalog.
DataValidator: Protocol for validating data from catalogs.
adc_toolkit.data.catalogs.kedro.KedroDataCatalog: Kedro-based implementation.
Notes
This is a Protocol class using structural subtyping (PEP 544). Implementations do not need to explicitly inherit from DataCatalog but must provide all required methods with compatible signatures.
The catalog pattern provides several benefits:
- Separation of concerns: data access logic separate from business logic
- Configuration-driven: datasets defined in config files, not hardcoded
- Testability: easy to mock or swap catalogs for testing
- Reproducibility: consistent data loading across environments
Thread safety and caching behavior are implementation-specific and should be documented in concrete implementations.
Examples
Using a catalog to load and save data:
>>> catalog = SomeCatalog.in_directory("path/to/config")
>>> df = catalog.load("training_data")
>>> processed_df = preprocess(df)
>>> catalog.save("processed_data", processed_df)
Catalogs enable clean separation between data access and processing:
>>> def pipeline(catalog: DataCatalog) -> None:
... raw = catalog.load("raw_data")
... cleaned = clean_data(raw)
... catalog.save("cleaned_data", cleaned)
... features = engineer_features(cleaned)
... catalog.save("features", features)
1739def _no_init_or_replace_init(self, *args, **kwargs): 1740 cls = type(self) 1741 1742 if cls._is_protocol: 1743 raise TypeError('Protocols cannot be instantiated') 1744 1745 # Already using a custom `__init__`. No need to calculate correct 1746 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1747 if cls.__init__ is not _no_init_or_replace_init: 1748 return 1749 1750 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1751 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1752 # searches for a proper new `__init__` in the MRO. The new `__init__` 1753 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1754 # instantiation of the protocol subclass will thus use the new 1755 # `__init__` and no longer call `_no_init_or_replace_init`. 1756 for base in cls.__mro__: 1757 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1758 if init is not _no_init_or_replace_init: 1759 cls.__init__ = init 1760 break 1761 else: 1762 # should not happen 1763 cls.__init__ = object.__init__ 1764 1765 cls.__init__(self, *args, **kwargs)
182 @classmethod 183 def in_directory(cls, path: str | Path) -> "DataCatalog": 184 """ 185 Create a catalog instance from configuration in a directory. 186 187 This factory method instantiates a catalog by reading configuration 188 files from the specified directory. The configuration typically defines 189 dataset names, file paths, formats, and load/save parameters. 190 191 Parameters 192 ---------- 193 path : str or pathlib.Path 194 Path to the directory containing catalog configuration files. 195 The directory should contain YAML or other configuration files 196 that define the datasets available in this catalog. 197 198 Returns 199 ------- 200 DataCatalog 201 A new catalog instance configured with datasets from the directory. 202 203 Raises 204 ------ 205 FileNotFoundError 206 If the specified directory does not exist. 207 ValueError 208 If the configuration files are invalid or cannot be parsed. 209 210 See Also 211 -------- 212 load : Load a dataset from the catalog. 213 save : Save a dataset to the catalog. 214 215 Notes 216 ----- 217 The exact configuration file format and structure depend on the 218 implementation. For example, Kedro-based catalogs expect a 219 `catalog.yml` file with dataset definitions. 220 221 Configuration files should not be committed with credentials or 222 sensitive information. Use environment variables or separate 223 credential files. 224 225 Examples 226 -------- 227 Create a catalog from a configuration directory: 228 229 >>> catalog = MyCatalog.in_directory("/path/to/config") 230 >>> catalog.load("my_dataset") 231 <Data object> 232 233 Using pathlib.Path: 234 235 >>> from pathlib import Path 236 >>> config_dir = Path("configs") / "production" 237 >>> catalog = MyCatalog.in_directory(config_dir) 238 """ 239 ...
Create a catalog instance from configuration in a directory.
This factory method instantiates a catalog by reading configuration files from the specified directory. The configuration typically defines dataset names, file paths, formats, and load/save parameters.
Parameters
- path (str or pathlib.Path): Path to the directory containing catalog configuration files. The directory should contain YAML or other configuration files that define the datasets available in this catalog.
Returns
- DataCatalog: A new catalog instance configured with datasets from the directory.
Raises
- FileNotFoundError: If the specified directory does not exist.
- ValueError: If the configuration files are invalid or cannot be parsed.
See Also
load: Load a dataset from the catalog.
save: Save a dataset to the catalog.
Notes
The exact configuration file format and structure depend on the
implementation. For example, Kedro-based catalogs expect a
catalog.yml file with dataset definitions.
Configuration files should not be committed with credentials or sensitive information. Use environment variables or separate credential files.
Examples
Create a catalog from a configuration directory:
>>> catalog = MyCatalog.in_directory("/path/to/config")
>>> catalog.load("my_dataset")
<Data object>
Using pathlib.Path:
>>> from pathlib import Path
>>> config_dir = Path("configs") / "production"
>>> catalog = MyCatalog.in_directory(config_dir)
241 def load(self, name: str) -> Data: 242 """ 243 Load a dataset by name from the catalog. 244 245 Retrieve a dataset using its registered name. The catalog handles 246 all I/O operations, file format parsing, and type conversions based 247 on the configuration for this dataset. 248 249 Parameters 250 ---------- 251 name : str 252 The registered name of the dataset to load. This name should 253 match a dataset definition in the catalog's configuration. 254 255 Returns 256 ------- 257 Data 258 The loaded dataset as a Data protocol-compatible object. The 259 specific type depends on the catalog configuration (e.g., 260 pandas DataFrame, Spark DataFrame). 261 262 Raises 263 ------ 264 KeyError 265 If no dataset with the given name is registered in the catalog. 266 FileNotFoundError 267 If the dataset's source file does not exist. 268 ValueError 269 If the dataset cannot be loaded due to format or parsing errors. 270 271 See Also 272 -------- 273 save : Save a dataset to the catalog. 274 in_directory : Create a catalog from configuration. 275 276 Notes 277 ----- 278 The load operation may involve: 279 - Reading from local files, cloud storage, or databases 280 - Parsing specific file formats (CSV, Parquet, JSON, etc.) 281 - Applying transformations defined in the catalog configuration 282 - Caching for performance (implementation-dependent) 283 284 Load operations should be idempotent: calling load multiple times 285 with the same name should return equivalent data. 286 287 Examples 288 -------- 289 Load a dataset by name: 290 291 >>> catalog = MyCatalog.in_directory("config/") 292 >>> df = catalog.load("customer_data") 293 >>> df.columns 294 Index(['customer_id', 'name', 'email'], dtype='object') 295 296 Load multiple datasets: 297 298 >>> train = catalog.load("training_data") 299 >>> test = catalog.load("test_data") 300 >>> model = catalog.load("trained_model") 301 """ 302 ...
Load a dataset by name from the catalog.
Retrieve a dataset using its registered name. The catalog handles all I/O operations, file format parsing, and type conversions based on the configuration for this dataset.
Parameters
- name (str): The registered name of the dataset to load. This name should match a dataset definition in the catalog's configuration.
Returns
- Data: The loaded dataset as a Data protocol-compatible object. The specific type depends on the catalog configuration (e.g., pandas DataFrame, Spark DataFrame).
Raises
- KeyError: If no dataset with the given name is registered in the catalog.
- FileNotFoundError: If the dataset's source file does not exist.
- ValueError: If the dataset cannot be loaded due to format or parsing errors.
See Also
save: Save a dataset to the catalog.
in_directory: Create a catalog from configuration.
Notes
The load operation may involve:
- Reading from local files, cloud storage, or databases
- Parsing specific file formats (CSV, Parquet, JSON, etc.)
- Applying transformations defined in the catalog configuration
- Caching for performance (implementation-dependent)
Load operations should be idempotent: calling load multiple times with the same name should return equivalent data.
Examples
Load a dataset by name:
>>> catalog = MyCatalog.in_directory("config/")
>>> df = catalog.load("customer_data")
>>> df.columns
Index(['customer_id', 'name', 'email'], dtype='object')
Load multiple datasets:
>>> train = catalog.load("training_data")
>>> test = catalog.load("test_data")
>>> model = catalog.load("trained_model")
304 def save(self, name: str, data: Data) -> None: 305 """ 306 Save a dataset by name to the catalog. 307 308 Store a dataset using its registered name. The catalog handles all 309 I/O operations, file format serialization, and storage operations 310 based on the configuration for this dataset. 311 312 Parameters 313 ---------- 314 name : str 315 The registered name of the dataset to save. This name should 316 match a dataset definition in the catalog's configuration. 317 data : Data 318 The dataset to save. Must be a Data protocol-compatible object 319 (e.g., pandas DataFrame, Spark DataFrame). 320 321 Returns 322 ------- 323 None 324 325 Raises 326 ------ 327 KeyError 328 If no dataset with the given name is registered in the catalog. 329 TypeError 330 If the data type is incompatible with the dataset configuration. 331 ValueError 332 If the dataset cannot be saved due to validation or format errors. 333 PermissionError 334 If the target location is not writable. 335 336 See Also 337 -------- 338 load : Load a dataset from the catalog. 339 in_directory : Create a catalog from configuration. 340 341 Notes 342 ----- 343 The save operation may involve: 344 - Writing to local files, cloud storage, or databases 345 - Serializing to specific file formats (CSV, Parquet, JSON, etc.) 346 - Creating directories if they don't exist 347 - Overwriting existing files (configuration-dependent) 348 - Applying transformations before saving 349 350 Save operations should be atomic when possible: either the entire 351 dataset is saved successfully, or no partial data is written. 352 353 Some implementations may support versioning, creating timestamped 354 or numbered versions of saved datasets. 355 356 Examples 357 -------- 358 Save a processed dataset: 359 360 >>> catalog = MyCatalog.in_directory("config/") 361 >>> processed_df = process_data(raw_df) 362 >>> catalog.save("processed_data", processed_df) 363 364 Save multiple datasets in a pipeline: 365 366 >>> catalog.save("cleaned_data", cleaned) 367 >>> catalog.save("features", features) 368 >>> catalog.save("predictions", predictions) 369 """ 370 ...
Save a dataset by name to the catalog.
Store a dataset using its registered name. The catalog handles all I/O operations, file format serialization, and storage operations based on the configuration for this dataset.
Parameters
- name (str): The registered name of the dataset to save. This name should match a dataset definition in the catalog's configuration.
- data (Data): The dataset to save. Must be a Data protocol-compatible object (e.g., pandas DataFrame, Spark DataFrame).
Returns
- None
Raises
- KeyError: If no dataset with the given name is registered in the catalog.
- TypeError: If the data type is incompatible with the dataset configuration.
- ValueError: If the dataset cannot be saved due to validation or format errors.
- PermissionError: If the target location is not writable.
See Also
load: Load a dataset from the catalog.
in_directory: Create a catalog from configuration.
Notes
The save operation may involve:
- Writing to local files, cloud storage, or databases
- Serializing to specific file formats (CSV, Parquet, JSON, etc.)
- Creating directories if they don't exist
- Overwriting existing files (configuration-dependent)
- Applying transformations before saving
Save operations should be atomic when possible: either the entire dataset is saved successfully, or no partial data is written.
Some implementations may support versioning, creating timestamped or numbered versions of saved datasets.
Examples
Save a processed dataset:
>>> catalog = MyCatalog.in_directory("config/")
>>> processed_df = process_data(raw_df)
>>> catalog.save("processed_data", processed_df)
Save multiple datasets in a pipeline:
>>> catalog.save("cleaned_data", cleaned)
>>> catalog.save("features", features)
>>> catalog.save("predictions", predictions)
373class DataValidator(Protocol): 374 """ 375 Protocol for data validator implementations. 376 377 This protocol defines the interface for data validators, which verify that 378 datasets meet specified quality, schema, and business rule requirements. 379 Validators execute validation rules (expectations, schemas, constraints) 380 and either return validated data or raise exceptions on validation failures. 381 382 Implementations typically use configuration files to define validation 383 rules separate from code. This enables declarative data validation and 384 facilitates maintaining data quality in production pipelines. 385 386 Methods 387 ------- 388 validate(name, data) 389 Validate a dataset against configured validation rules. 390 in_directory(path) 391 Create a new validator instance from configuration in a directory. 392 393 See Also 394 -------- 395 Data : Protocol for data objects being validated. 396 DataCatalog : Protocol for loading data to validate. 397 adc_toolkit.data.validators.gx.GXValidator : Great Expectations implementation. 398 adc_toolkit.data.validators.pandera.PanderaValidator : Pandera implementation. 399 adc_toolkit.data.validators.no_validator.NoValidator : No-op implementation. 400 401 Notes 402 ----- 403 This is a Protocol class using structural subtyping (PEP 544). Implementations 404 do not need to explicitly inherit from DataValidator but must provide all 405 required methods with compatible signatures. 406 407 Validators serve multiple purposes: 408 - Data quality assurance: catch schema drift and data corruption early 409 - Contract enforcement: ensure data meets expectations between pipeline stages 410 - Documentation: validation rules document expected data characteristics 411 - Monitoring: track validation results over time to detect degradation 412 413 Different implementations offer different trade-offs: 414 - Great Expectations: Rich ecosystem, profiling, data docs, cloud support 415 - Pandera: Lightweight, tight pandas integration, statistical validation 416 - NoValidator: No validation overhead for trusted data sources 417 418 Validation can be expensive on large datasets. Implementations may support 419 sampling or lazy validation strategies. 420 421 Examples 422 -------- 423 Using a validator in a data pipeline: 424 425 >>> validator = SomeValidator.in_directory("config/validations") 426 >>> raw_data = load_data() 427 >>> validated_data = validator.validate("raw_data", raw_data) 428 429 Combining validators with catalogs: 430 431 >>> catalog = MyCatalog.in_directory("config/") 432 >>> validator = MyValidator.in_directory("config/validations") 433 >>> data = catalog.load("customer_data") 434 >>> validated = validator.validate("customer_data", data) 435 >>> catalog.save("validated_customer_data", validated) 436 """ 437 438 def validate(self, name: str, data: Data) -> Data: 439 """ 440 Validate a dataset against configured validation rules. 441 442 Execute all validation rules associated with the named dataset. If 443 validation succeeds, return the data (potentially with validation 444 metadata attached). If validation fails, raise an exception with 445 details about which rules failed. 446 447 Parameters 448 ---------- 449 name : str 450 The name identifying which validation rules to apply. This should 451 correspond to a validation configuration (expectation suite, schema, 452 etc.) defined in the validator's configuration. 453 data : Data 454 The dataset to validate. Must be a Data protocol-compatible object 455 (e.g., pandas DataFrame, Spark DataFrame). 456 457 Returns 458 ------- 459 Data 460 The validated dataset. This is typically the same object as the 461 input data parameter, but implementations may attach validation 462 metadata or perform transformations during validation. 463 464 Raises 465 ------ 466 KeyError 467 If no validation rules are configured for the given name. 468 ValidationError 469 If the data fails validation. The exception should include details 470 about which validation rules failed and the observed values. 471 TypeError 472 If the data type is incompatible with the validation rules. 473 474 See Also 475 -------- 476 in_directory : Create a validator from configuration. 477 478 Notes 479 ----- 480 Validation typically checks: 481 - Schema: column names, data types, nullability 482 - Constraints: value ranges, uniqueness, referential integrity 483 - Statistical properties: distributions, correlations, outliers 484 - Business rules: domain-specific requirements 485 486 The behavior on validation failure is implementation-specific: 487 - Some validators raise immediately on first failure 488 - Others collect all failures and raise with complete results 489 - Some support warning-level validations that log but don't raise 490 491 Validation may modify data in some implementations: 492 - Type coercion to match schema 493 - Null filling or imputation 494 - Outlier capping or filtering 495 496 For large datasets, implementations may support sampling-based 497 validation to reduce computational cost while maintaining statistical 498 confidence. 499 500 Examples 501 -------- 502 Validate a dataset: 503 504 >>> validator = MyValidator.in_directory("config/validations") 505 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 506 >>> validated = validator.validate("my_dataset", df) 507 508 Handle validation failures: 509 510 >>> try: 511 ... validated = validator.validate("strict_dataset", df) 512 ... except ValidationError as e: 513 ... print(f"Validation failed: {e}") 514 ... # Log failure, send alert, or handle gracefully 515 516 Use in a data pipeline: 517 518 >>> def pipeline(validator: DataValidator) -> None: 519 ... raw = load_raw_data() 520 ... validated_raw = validator.validate("raw_schema", raw) 521 ... processed = process(validated_raw) 522 ... validated_processed = validator.validate("processed_schema", processed) 523 ... save_results(validated_processed) 524 """ 525 ... 526 527 @classmethod 528 def in_directory(cls, path: str | Path) -> "DataValidator": 529 """ 530 Create a validator instance from configuration in a directory. 531 532 This factory method instantiates a validator by reading validation 533 configurations from the specified directory. The configuration defines 534 validation rules (expectations, schemas, constraints) for named datasets. 535 536 Parameters 537 ---------- 538 path : str or pathlib.Path 539 Path to the directory containing validator configuration files. 540 The directory should contain validation rule definitions in a 541 format appropriate for the implementation (e.g., Great Expectations 542 checkpoints, Pandera schemas). 543 544 Returns 545 ------- 546 DataValidator 547 A new validator instance configured with rules from the directory. 548 549 Raises 550 ------ 551 FileNotFoundError 552 If the specified directory does not exist. 553 ValueError 554 If the configuration files are invalid or cannot be parsed. 555 556 See Also 557 -------- 558 validate : Validate a dataset using this validator. 559 560 Notes 561 ----- 562 The exact configuration file format and structure depend on the 563 implementation: 564 - GXValidator expects Great Expectations project structure 565 (expectations/, checkpoints/, great_expectations.yml) 566 - PanderaValidator expects Python files defining Pandera schemas 567 - Custom validators may use JSON, YAML, or other formats 568 569 Configuration should be version controlled to track changes to 570 validation rules over time. 571 572 Some implementations support multiple configuration directories, 573 allowing validation rules to be composed from multiple sources. 574 575 Examples 576 -------- 577 Create a validator from a configuration directory: 578 579 >>> validator = MyValidator.in_directory("/path/to/validations") 580 >>> validator.validate("dataset_name", data) 581 <validated Data object> 582 583 Using pathlib.Path: 584 585 >>> from pathlib import Path 586 >>> validation_dir = Path("config") / "data_quality" 587 >>> validator = MyValidator.in_directory(validation_dir) 588 589 Separate validators for different environments: 590 591 >>> dev_validator = MyValidator.in_directory("config/validations/dev") 592 >>> prod_validator = MyValidator.in_directory("config/validations/prod") 593 """ 594 ...
Protocol for data validator implementations.
This protocol defines the interface for data validators, which verify that datasets meet specified quality, schema, and business rule requirements. Validators execute validation rules (expectations, schemas, constraints) and either return validated data or raise exceptions on validation failures.
Implementations typically use configuration files to define validation rules separate from code. This enables declarative data validation and facilitates maintaining data quality in production pipelines.
Methods
validate(name, data) Validate a dataset against configured validation rules. in_directory(path) Create a new validator instance from configuration in a directory.
See Also
Data: Protocol for data objects being validated.
DataCatalog: Protocol for loading data to validate.
adc_toolkit.data.validators.gx.GXValidator: Great Expectations implementation.
adc_toolkit.data.validators.pandera.PanderaValidator: Pandera implementation.
adc_toolkit.data.validators.no_validator.NoValidator: No-op implementation.
Notes
This is a Protocol class using structural subtyping (PEP 544). Implementations do not need to explicitly inherit from DataValidator but must provide all required methods with compatible signatures.
Validators serve multiple purposes:
- Data quality assurance: catch schema drift and data corruption early
- Contract enforcement: ensure data meets expectations between pipeline stages
- Documentation: validation rules document expected data characteristics
- Monitoring: track validation results over time to detect degradation
Different implementations offer different trade-offs:
- Great Expectations: Rich ecosystem, profiling, data docs, cloud support
- Pandera: Lightweight, tight pandas integration, statistical validation
- NoValidator: No validation overhead for trusted data sources
Validation can be expensive on large datasets. Implementations may support sampling or lazy validation strategies.
Examples
Using a validator in a data pipeline:
>>> validator = SomeValidator.in_directory("config/validations")
>>> raw_data = load_data()
>>> validated_data = validator.validate("raw_data", raw_data)
Combining validators with catalogs:
>>> catalog = MyCatalog.in_directory("config/")
>>> validator = MyValidator.in_directory("config/validations")
>>> data = catalog.load("customer_data")
>>> validated = validator.validate("customer_data", data)
>>> catalog.save("validated_customer_data", validated)
1739def _no_init_or_replace_init(self, *args, **kwargs): 1740 cls = type(self) 1741 1742 if cls._is_protocol: 1743 raise TypeError('Protocols cannot be instantiated') 1744 1745 # Already using a custom `__init__`. No need to calculate correct 1746 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1747 if cls.__init__ is not _no_init_or_replace_init: 1748 return 1749 1750 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1751 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1752 # searches for a proper new `__init__` in the MRO. The new `__init__` 1753 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1754 # instantiation of the protocol subclass will thus use the new 1755 # `__init__` and no longer call `_no_init_or_replace_init`. 1756 for base in cls.__mro__: 1757 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1758 if init is not _no_init_or_replace_init: 1759 cls.__init__ = init 1760 break 1761 else: 1762 # should not happen 1763 cls.__init__ = object.__init__ 1764 1765 cls.__init__(self, *args, **kwargs)
438 def validate(self, name: str, data: Data) -> Data: 439 """ 440 Validate a dataset against configured validation rules. 441 442 Execute all validation rules associated with the named dataset. If 443 validation succeeds, return the data (potentially with validation 444 metadata attached). If validation fails, raise an exception with 445 details about which rules failed. 446 447 Parameters 448 ---------- 449 name : str 450 The name identifying which validation rules to apply. This should 451 correspond to a validation configuration (expectation suite, schema, 452 etc.) defined in the validator's configuration. 453 data : Data 454 The dataset to validate. Must be a Data protocol-compatible object 455 (e.g., pandas DataFrame, Spark DataFrame). 456 457 Returns 458 ------- 459 Data 460 The validated dataset. This is typically the same object as the 461 input data parameter, but implementations may attach validation 462 metadata or perform transformations during validation. 463 464 Raises 465 ------ 466 KeyError 467 If no validation rules are configured for the given name. 468 ValidationError 469 If the data fails validation. The exception should include details 470 about which validation rules failed and the observed values. 471 TypeError 472 If the data type is incompatible with the validation rules. 473 474 See Also 475 -------- 476 in_directory : Create a validator from configuration. 477 478 Notes 479 ----- 480 Validation typically checks: 481 - Schema: column names, data types, nullability 482 - Constraints: value ranges, uniqueness, referential integrity 483 - Statistical properties: distributions, correlations, outliers 484 - Business rules: domain-specific requirements 485 486 The behavior on validation failure is implementation-specific: 487 - Some validators raise immediately on first failure 488 - Others collect all failures and raise with complete results 489 - Some support warning-level validations that log but don't raise 490 491 Validation may modify data in some implementations: 492 - Type coercion to match schema 493 - Null filling or imputation 494 - Outlier capping or filtering 495 496 For large datasets, implementations may support sampling-based 497 validation to reduce computational cost while maintaining statistical 498 confidence. 499 500 Examples 501 -------- 502 Validate a dataset: 503 504 >>> validator = MyValidator.in_directory("config/validations") 505 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 506 >>> validated = validator.validate("my_dataset", df) 507 508 Handle validation failures: 509 510 >>> try: 511 ... validated = validator.validate("strict_dataset", df) 512 ... except ValidationError as e: 513 ... print(f"Validation failed: {e}") 514 ... # Log failure, send alert, or handle gracefully 515 516 Use in a data pipeline: 517 518 >>> def pipeline(validator: DataValidator) -> None: 519 ... raw = load_raw_data() 520 ... validated_raw = validator.validate("raw_schema", raw) 521 ... processed = process(validated_raw) 522 ... validated_processed = validator.validate("processed_schema", processed) 523 ... save_results(validated_processed) 524 """ 525 ...
Validate a dataset against configured validation rules.
Execute all validation rules associated with the named dataset. If validation succeeds, return the data (potentially with validation metadata attached). If validation fails, raise an exception with details about which rules failed.
Parameters
- name (str): The name identifying which validation rules to apply. This should correspond to a validation configuration (expectation suite, schema, etc.) defined in the validator's configuration.
- data (Data): The dataset to validate. Must be a Data protocol-compatible object (e.g., pandas DataFrame, Spark DataFrame).
Returns
- Data: The validated dataset. This is typically the same object as the input data parameter, but implementations may attach validation metadata or perform transformations during validation.
Raises
- KeyError: If no validation rules are configured for the given name.
- ValidationError: If the data fails validation. The exception should include details about which validation rules failed and the observed values.
- TypeError: If the data type is incompatible with the validation rules.
See Also
in_directory: Create a validator from configuration.
Notes
Validation typically checks:
- Schema: column names, data types, nullability
- Constraints: value ranges, uniqueness, referential integrity
- Statistical properties: distributions, correlations, outliers
- Business rules: domain-specific requirements
The behavior on validation failure is implementation-specific:
- Some validators raise immediately on first failure
- Others collect all failures and raise with complete results
- Some support warning-level validations that log but don't raise
Validation may modify data in some implementations:
- Type coercion to match schema
- Null filling or imputation
- Outlier capping or filtering
For large datasets, implementations may support sampling-based validation to reduce computational cost while maintaining statistical confidence.
Examples
Validate a dataset:
>>> validator = MyValidator.in_directory("config/validations")
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
>>> validated = validator.validate("my_dataset", df)
Handle validation failures:
>>> try:
... validated = validator.validate("strict_dataset", df)
... except ValidationError as e:
... print(f"Validation failed: {e}")
... # Log failure, send alert, or handle gracefully
Use in a data pipeline:
>>> def pipeline(validator: DataValidator) -> None:
... raw = load_raw_data()
... validated_raw = validator.validate("raw_schema", raw)
... processed = process(validated_raw)
... validated_processed = validator.validate("processed_schema", processed)
... save_results(validated_processed)
527 @classmethod 528 def in_directory(cls, path: str | Path) -> "DataValidator": 529 """ 530 Create a validator instance from configuration in a directory. 531 532 This factory method instantiates a validator by reading validation 533 configurations from the specified directory. The configuration defines 534 validation rules (expectations, schemas, constraints) for named datasets. 535 536 Parameters 537 ---------- 538 path : str or pathlib.Path 539 Path to the directory containing validator configuration files. 540 The directory should contain validation rule definitions in a 541 format appropriate for the implementation (e.g., Great Expectations 542 checkpoints, Pandera schemas). 543 544 Returns 545 ------- 546 DataValidator 547 A new validator instance configured with rules from the directory. 548 549 Raises 550 ------ 551 FileNotFoundError 552 If the specified directory does not exist. 553 ValueError 554 If the configuration files are invalid or cannot be parsed. 555 556 See Also 557 -------- 558 validate : Validate a dataset using this validator. 559 560 Notes 561 ----- 562 The exact configuration file format and structure depend on the 563 implementation: 564 - GXValidator expects Great Expectations project structure 565 (expectations/, checkpoints/, great_expectations.yml) 566 - PanderaValidator expects Python files defining Pandera schemas 567 - Custom validators may use JSON, YAML, or other formats 568 569 Configuration should be version controlled to track changes to 570 validation rules over time. 571 572 Some implementations support multiple configuration directories, 573 allowing validation rules to be composed from multiple sources. 574 575 Examples 576 -------- 577 Create a validator from a configuration directory: 578 579 >>> validator = MyValidator.in_directory("/path/to/validations") 580 >>> validator.validate("dataset_name", data) 581 <validated Data object> 582 583 Using pathlib.Path: 584 585 >>> from pathlib import Path 586 >>> validation_dir = Path("config") / "data_quality" 587 >>> validator = MyValidator.in_directory(validation_dir) 588 589 Separate validators for different environments: 590 591 >>> dev_validator = MyValidator.in_directory("config/validations/dev") 592 >>> prod_validator = MyValidator.in_directory("config/validations/prod") 593 """ 594 ...
Create a validator instance from configuration in a directory.
This factory method instantiates a validator by reading validation configurations from the specified directory. The configuration defines validation rules (expectations, schemas, constraints) for named datasets.
Parameters
- path (str or pathlib.Path): Path to the directory containing validator configuration files. The directory should contain validation rule definitions in a format appropriate for the implementation (e.g., Great Expectations checkpoints, Pandera schemas).
Returns
- DataValidator: A new validator instance configured with rules from the directory.
Raises
- FileNotFoundError: If the specified directory does not exist.
- ValueError: If the configuration files are invalid or cannot be parsed.
See Also
validate: Validate a dataset using this validator.
Notes
The exact configuration file format and structure depend on the implementation:
- GXValidator expects Great Expectations project structure (expectations/, checkpoints/, great_expectations.yml)
- PanderaValidator expects Python files defining Pandera schemas
- Custom validators may use JSON, YAML, or other formats
Configuration should be version controlled to track changes to validation rules over time.
Some implementations support multiple configuration directories, allowing validation rules to be composed from multiple sources.
Examples
Create a validator from a configuration directory:
>>> validator = MyValidator.in_directory("/path/to/validations")
>>> validator.validate("dataset_name", data)
<validated Data object>
Using pathlib.Path:
>>> from pathlib import Path
>>> validation_dir = Path("config") / "data_quality"
>>> validator = MyValidator.in_directory(validation_dir)
Separate validators for different environments:
>>> dev_validator = MyValidator.in_directory("config/validations/dev")
>>> prod_validator = MyValidator.in_directory("config/validations/prod")