adc_toolkit.data.validators.gx
Great Expectations validator implementation for adc-toolkit.
This module provides a comprehensive Great Expectations (GX) integration for the adc-toolkit data validation system. It implements the DataValidator protocol using GX's powerful expectation framework, enabling enterprise-grade data quality validation with rich features including expectation suites, checkpoints, batch management, data profiling, and automatic data documentation.
The module orchestrates the complete GX validation workflow through a flexible, strategy-based architecture that supports multiple storage backends (local filesystem, AWS S3, Google Cloud Storage, Azure Blob Storage), pluggable validation strategies, and automatic schema freezing for rapid prototyping.
Classes
GXValidator Main validator class implementing the DataValidator protocol. Provides high-level validation interface with automatic expectation suite creation, schema freezing, and comprehensive error reporting.
BatchManager Manages creation and configuration of Great Expectations batch objects from pandas or PySpark DataFrames. Handles batch request generation and batch execution for validation checkpoints.
ConfigurationBasedExpectationAddition Strategy for adding expectations to suites based on configuration files. Enables declarative expectation management through YAML or JSON configuration.
ValidatorBasedExpectationAddition Strategy for adding expectations using GX Validator objects. Provides programmatic expectation addition with full access to GX's validation API.
Functions
None This module exports only classes. Validation orchestration is handled by GXValidator.validate(), and supporting functionality is encapsulated in strategy and manager classes.
See Also
adc_toolkit.data.validators.gx.validator: GXValidator implementation details.
adc_toolkit.data.validators.gx.batch_managers: Batch management components.
adc_toolkit.data.validators.gx.data_context: Data context implementations.
adc_toolkit.data.abs.DataValidator: Protocol defining validator interface.
adc_toolkit.data.ValidatedDataCatalog: Data catalog with integrated validation.
adc_toolkit.data.validators.pandera: Alternative lightweight validator.
adc_toolkit.data.validators.no_validator: No-op validator for testing.
Notes
Great Expectations Overview
Great Expectations (https://greatexpectations.io/) is an open-source Python library for data quality, testing, profiling, and documentation. This integration provides a bridge between adc-toolkit's validation abstraction and GX's rich ecosystem, enabling:
- Declarative data quality rules: Define expectations in configuration
- Automatic profiling: Generate expectations from sample data
- Data documentation: Generate comprehensive data docs websites
- Multiple backends: Store validation artifacts in cloud storage
- Extensive expectation library: 50+ built-in expectations plus custom
- Version control: Track changes to expectations over time
- Integration: Compatible with Jupyter, Airflow, dbt, and other tools
Architecture and Design Patterns
The GX validator implements several design patterns:
Strategy Pattern: Pluggable strategies for expectation suite lookup and expectation addition enable flexible validation workflows without modifying core logic.
- ``AutoExpectationSuiteCreation``: Auto-creates missing suites
- ``CustomExpectationSuiteStrategy``: Requires pre-defined suites
- ``SchemaExpectationAddition``: Automatically adds schema expectations
- ``SkipExpectationAddition``: Skips automatic expectation addition
Facade Pattern:
GXValidator simplifies GX's complex API by providing a clean, high-level
interface (validate()) that orchestrates multiple underlying operations.
Dependency Injection: Data context and strategies are injected via constructor, enabling testability, configuration flexibility, and easy mocking in unit tests.
Validation Workflow
The complete validation sequence when calling GXValidator.validate():
- Suite Lookup: Check if expectation suite exists for dataset
- Suite Creation: Create suite if missing (based on lookup strategy)
- Batch Creation: Convert data to GX Batch using BatchManager
- Expectation Addition: Add expectations based on addition strategy
- Checkpoint Creation: Create or update checkpoint for dataset
- Checkpoint Execution: Execute checkpoint to validate batch
- Result Evaluation: Analyze results, raise ValidationError on failure
- Data Return: Return original data if validation succeeds
Storage Backends
The module supports multiple data context backends through the
adc_toolkit.data.validators.gx.data_context submodule:
- RepoDataContext: Filesystem-based (default)
- S3DataContext: AWS S3 storage
- GCPDataContext: Google Cloud Storage
- AzureDataContext: Azure Blob Storage
- EphemeralDataContext: In-memory (testing)
Backend selection is transparent to application code, configured through the
in_directory() factory method or by passing a pre-configured data context
to the GXValidator constructor.
Schema Freezing
With default strategies (AutoExpectationSuiteCreation +
SchemaExpectationAddition), the validator automatically "freezes" schemas
on first validation:
- First validation inspects DataFrame structure (columns, types)
- Schema expectations are generated and stored in expectation suite
- Subsequent validations enforce frozen schema
- Schema drift is detected and reported as validation failure
This provides automatic protection against schema changes while allowing manual customization of expectation suites when needed.
Performance Considerations
- First validation overhead: Suite creation and checkpoint setup add latency to first validation. Subsequent validations are faster (suite reuse).
- Schema inspection cost: Schema freezing requires full DataFrame inspection, scaling with number of columns (not rows).
- Expectation complexity: Simple schema checks are fast; statistical expectations (distributions, correlations) can be expensive on large datasets.
- Backend I/O: Cloud backends (S3, GCS) add network latency compared to local filesystem.
- Sampling strategies: For large datasets, consider validating samples rather than complete data.
Thread Safety
GXValidator instances are not thread-safe. The underlying Great Expectations data context performs file/network I/O and maintains internal state. For concurrent validation scenarios, create separate validator instances (with separate data contexts) per thread or implement external locking.
Version Control Best Practices
When using filesystem-based data contexts (RepoDataContext), follow these version control guidelines:
Commit to git:
expectations/: Expectation suite JSON filescheckpoints/: Checkpoint YAML configurationsgreat_expectations.yml: Main configurationplugins/: Custom expectation implementations
Add to .gitignore:
uncommitted/: Validation results and data docsuncommitted/validations/: Validation result artifactsuncommitted/data_docs/: Generated documentation websites
This approach version controls validation rules while excluding environment-specific results and generated documentation.
Examples
Basic usage with automatic suite creation and schema freezing:
>>> from adc_toolkit.data.validators.gx import GXValidator
>>> import pandas as pd
>>> validator = GXValidator.in_directory("config/gx")
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
>>> validated = validator.validate("sales_data", df)
>>> # First validation: auto-creates suite, freezes schema
>>> # Subsequent validations: enforces frozen schema
Using custom strategies for strict validation:
>>> from adc_toolkit.data.validators.gx import GXValidator
>>> from adc_toolkit.data.validators.gx.batch_managers import (
... CustomExpectationSuiteStrategy,
... SkipExpectationAddition,
... )
>>> from great_expectations.data_context import EphemeralDataContext
>>> context = EphemeralDataContext()
>>> validator = GXValidator(
... data_context=context,
... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(),
... expectation_addition_strategy=SkipExpectationAddition(),
... )
>>> # Requires pre-defined suites, no automatic expectations
Using with cloud-based data context:
>>> from adc_toolkit.data.validators.gx import GXValidator
>>> from adc_toolkit.data.validators.gx.data_context import S3DataContext
>>> s3_context = S3DataContext("s3://my-bucket/gx-config").create()
>>> validator = GXValidator(data_context=s3_context)
>>> validated = validator.validate("dataset", df)
>>> # Expectations and results stored in S3
Integration with ValidatedDataCatalog:
>>> from adc_toolkit.data import ValidatedDataCatalog
>>> from adc_toolkit.data.validators.gx import GXValidator
>>> catalog = ValidatedDataCatalog.in_directory(path="config", validator=GXValidator.in_directory("config/gx"))
>>> df = catalog.load("customer_data") # Validates after load
>>> catalog.save("processed_data", df) # Validates before save
Detecting schema drift:
>>> validator = GXValidator.in_directory("config/gx")
>>> # First validation with original schema
>>> df1 = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
>>> validator.validate("users", df1) # Creates suite, freezes schema
>>> # Subsequent validation with changed schema
>>> df2 = pd.DataFrame({"id": [3], "age": [30]}) # Different columns!
>>> try:
... validator.validate("users", df2)
... except ValidationError as e:
... print(f"Schema drift detected: {e}")
... # Validation fails due to mismatched columns
Manual expectation suite creation:
>>> from great_expectations.data_context import EphemeralDataContext
>>> from great_expectations.core import ExpectationConfiguration
>>> context = EphemeralDataContext()
>>> # Create custom suite with specific expectations
>>> suite = context.create_expectation_suite("custom_suite")
>>> suite.add_expectation(
... ExpectationConfiguration(
... expectation_type="expect_column_values_to_be_in_range",
... kwargs={"column": "age", "min_value": 0, "max_value": 120},
... )
... )
>>> validator = GXValidator(data_context=context)
>>> df = pd.DataFrame({"age": [25, 30, 35]})
>>> validated = validator.validate("custom", df)
Data pipeline with multiple validation stages:
>>> def quality_pipeline(gx_path: str):
... validator = GXValidator.in_directory(gx_path)
...
... # Validate raw input
... raw = load_raw_data()
... validated_raw = validator.validate("raw_stage", raw)
...
... # Transform and validate
... cleaned = clean_data(validated_raw)
... validated_clean = validator.validate("clean_stage", cleaned)
...
... # Feature engineering and validate
... features = engineer_features(validated_clean)
... validated_features = validator.validate("feature_stage", features)
...
... return validated_features
1""" 2Great Expectations validator implementation for adc-toolkit. 3 4This module provides a comprehensive Great Expectations (GX) integration for the 5adc-toolkit data validation system. It implements the DataValidator protocol using 6GX's powerful expectation framework, enabling enterprise-grade data quality validation 7with rich features including expectation suites, checkpoints, batch management, data 8profiling, and automatic data documentation. 9 10The module orchestrates the complete GX validation workflow through a flexible, 11strategy-based architecture that supports multiple storage backends (local filesystem, 12AWS S3, Google Cloud Storage, Azure Blob Storage), pluggable validation strategies, 13and automatic schema freezing for rapid prototyping. 14 15Classes 16------- 17GXValidator 18 Main validator class implementing the DataValidator protocol. Provides high-level 19 validation interface with automatic expectation suite creation, schema freezing, 20 and comprehensive error reporting. 21 22BatchManager 23 Manages creation and configuration of Great Expectations batch objects from 24 pandas or PySpark DataFrames. Handles batch request generation and batch 25 execution for validation checkpoints. 26 27ConfigurationBasedExpectationAddition 28 Strategy for adding expectations to suites based on configuration files. Enables 29 declarative expectation management through YAML or JSON configuration. 30 31ValidatorBasedExpectationAddition 32 Strategy for adding expectations using GX Validator objects. Provides programmatic 33 expectation addition with full access to GX's validation API. 34 35Functions 36--------- 37None 38 This module exports only classes. Validation orchestration is handled by 39 GXValidator.validate(), and supporting functionality is encapsulated in 40 strategy and manager classes. 41 42See Also 43-------- 44adc_toolkit.data.validators.gx.validator : GXValidator implementation details. 45adc_toolkit.data.validators.gx.batch_managers : Batch management components. 46adc_toolkit.data.validators.gx.data_context : Data context implementations. 47adc_toolkit.data.abs.DataValidator : Protocol defining validator interface. 48adc_toolkit.data.ValidatedDataCatalog : Data catalog with integrated validation. 49adc_toolkit.data.validators.pandera : Alternative lightweight validator. 50adc_toolkit.data.validators.no_validator : No-op validator for testing. 51 52Notes 53----- 54**Great Expectations Overview** 55 56Great Expectations (https://greatexpectations.io/) is an open-source Python 57library for data quality, testing, profiling, and documentation. This integration 58provides a bridge between adc-toolkit's validation abstraction and GX's rich 59ecosystem, enabling: 60 61- **Declarative data quality rules**: Define expectations in configuration 62- **Automatic profiling**: Generate expectations from sample data 63- **Data documentation**: Generate comprehensive data docs websites 64- **Multiple backends**: Store validation artifacts in cloud storage 65- **Extensive expectation library**: 50+ built-in expectations plus custom 66- **Version control**: Track changes to expectations over time 67- **Integration**: Compatible with Jupyter, Airflow, dbt, and other tools 68 69**Architecture and Design Patterns** 70 71The GX validator implements several design patterns: 72 73**Strategy Pattern:** 74 Pluggable strategies for expectation suite lookup and expectation addition 75 enable flexible validation workflows without modifying core logic. 76 77 - ``AutoExpectationSuiteCreation``: Auto-creates missing suites 78 - ``CustomExpectationSuiteStrategy``: Requires pre-defined suites 79 - ``SchemaExpectationAddition``: Automatically adds schema expectations 80 - ``SkipExpectationAddition``: Skips automatic expectation addition 81 82**Facade Pattern:** 83 GXValidator simplifies GX's complex API by providing a clean, high-level 84 interface (``validate()``) that orchestrates multiple underlying operations. 85 86**Dependency Injection:** 87 Data context and strategies are injected via constructor, enabling testability, 88 configuration flexibility, and easy mocking in unit tests. 89 90**Validation Workflow** 91 92The complete validation sequence when calling ``GXValidator.validate()``: 93 941. **Suite Lookup**: Check if expectation suite exists for dataset 952. **Suite Creation**: Create suite if missing (based on lookup strategy) 963. **Batch Creation**: Convert data to GX Batch using BatchManager 974. **Expectation Addition**: Add expectations based on addition strategy 985. **Checkpoint Creation**: Create or update checkpoint for dataset 996. **Checkpoint Execution**: Execute checkpoint to validate batch 1007. **Result Evaluation**: Analyze results, raise ValidationError on failure 1018. **Data Return**: Return original data if validation succeeds 102 103**Storage Backends** 104 105The module supports multiple data context backends through the 106``adc_toolkit.data.validators.gx.data_context`` submodule: 107 108- **RepoDataContext**: Filesystem-based (default) 109- **S3DataContext**: AWS S3 storage 110- **GCPDataContext**: Google Cloud Storage 111- **AzureDataContext**: Azure Blob Storage 112- **EphemeralDataContext**: In-memory (testing) 113 114Backend selection is transparent to application code, configured through the 115``in_directory()`` factory method or by passing a pre-configured data context 116to the ``GXValidator`` constructor. 117 118**Schema Freezing** 119 120With default strategies (``AutoExpectationSuiteCreation`` + 121``SchemaExpectationAddition``), the validator automatically "freezes" schemas 122on first validation: 123 1241. First validation inspects DataFrame structure (columns, types) 1252. Schema expectations are generated and stored in expectation suite 1263. Subsequent validations enforce frozen schema 1274. Schema drift is detected and reported as validation failure 128 129This provides automatic protection against schema changes while allowing manual 130customization of expectation suites when needed. 131 132**Performance Considerations** 133 134- **First validation overhead**: Suite creation and checkpoint setup add latency 135 to first validation. Subsequent validations are faster (suite reuse). 136- **Schema inspection cost**: Schema freezing requires full DataFrame inspection, 137 scaling with number of columns (not rows). 138- **Expectation complexity**: Simple schema checks are fast; statistical 139 expectations (distributions, correlations) can be expensive on large datasets. 140- **Backend I/O**: Cloud backends (S3, GCS) add network latency compared to 141 local filesystem. 142- **Sampling strategies**: For large datasets, consider validating samples 143 rather than complete data. 144 145**Thread Safety** 146 147GXValidator instances are not thread-safe. The underlying Great Expectations data 148context performs file/network I/O and maintains internal state. For concurrent 149validation scenarios, create separate validator instances (with separate data 150contexts) per thread or implement external locking. 151 152**Version Control Best Practices** 153 154When using filesystem-based data contexts (RepoDataContext), follow these 155version control guidelines: 156 157**Commit to git:** 158- ``expectations/``: Expectation suite JSON files 159- ``checkpoints/``: Checkpoint YAML configurations 160- ``great_expectations.yml``: Main configuration 161- ``plugins/``: Custom expectation implementations 162 163**Add to .gitignore:** 164- ``uncommitted/``: Validation results and data docs 165- ``uncommitted/validations/``: Validation result artifacts 166- ``uncommitted/data_docs/``: Generated documentation websites 167 168This approach version controls validation rules while excluding environment-specific 169results and generated documentation. 170 171Examples 172-------- 173Basic usage with automatic suite creation and schema freezing: 174 175>>> from adc_toolkit.data.validators.gx import GXValidator 176>>> import pandas as pd 177>>> validator = GXValidator.in_directory("config/gx") 178>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 179>>> validated = validator.validate("sales_data", df) 180>>> # First validation: auto-creates suite, freezes schema 181>>> # Subsequent validations: enforces frozen schema 182 183Using custom strategies for strict validation: 184 185>>> from adc_toolkit.data.validators.gx import GXValidator 186>>> from adc_toolkit.data.validators.gx.batch_managers import ( 187... CustomExpectationSuiteStrategy, 188... SkipExpectationAddition, 189... ) 190>>> from great_expectations.data_context import EphemeralDataContext 191>>> context = EphemeralDataContext() 192>>> validator = GXValidator( 193... data_context=context, 194... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(), 195... expectation_addition_strategy=SkipExpectationAddition(), 196... ) 197>>> # Requires pre-defined suites, no automatic expectations 198 199Using with cloud-based data context: 200 201>>> from adc_toolkit.data.validators.gx import GXValidator 202>>> from adc_toolkit.data.validators.gx.data_context import S3DataContext 203>>> s3_context = S3DataContext("s3://my-bucket/gx-config").create() 204>>> validator = GXValidator(data_context=s3_context) 205>>> validated = validator.validate("dataset", df) 206>>> # Expectations and results stored in S3 207 208Integration with ValidatedDataCatalog: 209 210>>> from adc_toolkit.data import ValidatedDataCatalog 211>>> from adc_toolkit.data.validators.gx import GXValidator 212>>> catalog = ValidatedDataCatalog.in_directory(path="config", validator=GXValidator.in_directory("config/gx")) 213>>> df = catalog.load("customer_data") # Validates after load 214>>> catalog.save("processed_data", df) # Validates before save 215 216Detecting schema drift: 217 218>>> validator = GXValidator.in_directory("config/gx") 219>>> # First validation with original schema 220>>> df1 = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]}) 221>>> validator.validate("users", df1) # Creates suite, freezes schema 222>>> # Subsequent validation with changed schema 223>>> df2 = pd.DataFrame({"id": [3], "age": [30]}) # Different columns! 224>>> try: 225... validator.validate("users", df2) 226... except ValidationError as e: 227... print(f"Schema drift detected: {e}") 228... # Validation fails due to mismatched columns 229 230Manual expectation suite creation: 231 232>>> from great_expectations.data_context import EphemeralDataContext 233>>> from great_expectations.core import ExpectationConfiguration 234>>> context = EphemeralDataContext() 235>>> # Create custom suite with specific expectations 236>>> suite = context.create_expectation_suite("custom_suite") 237>>> suite.add_expectation( 238... ExpectationConfiguration( 239... expectation_type="expect_column_values_to_be_in_range", 240... kwargs={"column": "age", "min_value": 0, "max_value": 120}, 241... ) 242... ) 243>>> validator = GXValidator(data_context=context) 244>>> df = pd.DataFrame({"age": [25, 30, 35]}) 245>>> validated = validator.validate("custom", df) 246 247Data pipeline with multiple validation stages: 248 249>>> def quality_pipeline(gx_path: str): 250... validator = GXValidator.in_directory(gx_path) 251... 252... # Validate raw input 253... raw = load_raw_data() 254... validated_raw = validator.validate("raw_stage", raw) 255... 256... # Transform and validate 257... cleaned = clean_data(validated_raw) 258... validated_clean = validator.validate("clean_stage", cleaned) 259... 260... # Feature engineering and validate 261... features = engineer_features(validated_clean) 262... validated_features = validator.validate("feature_stage", features) 263... 264... return validated_features 265""" 266 267from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager 268from adc_toolkit.data.validators.gx.batch_managers.expectation_addition import ( 269 ConfigurationBasedExpectationAddition, 270 ValidatorBasedExpectationAddition, 271) 272from adc_toolkit.data.validators.gx.validator import GXValidator 273 274 275__all__ = [ 276 "BatchManager", 277 "ConfigurationBasedExpectationAddition", 278 "GXValidator", 279 "ValidatorBasedExpectationAddition", 280]
20@dataclass 21class BatchManager: 22 """ 23 Coordinate batch metadata and operations for Great Expectations validation. 24 25 The BatchManager dataclass serves as a central coordination point in the 26 Great Expectations (GX) validation workflow. It encapsulates all essential 27 metadata about a validation batch, including the dataset name, the data to 28 be validated, and the GX data context. Upon initialization, it automatically 29 creates a BatchRequest that can be used by downstream validation components 30 (expectation strategies, checkpoint managers) to execute validations. 31 32 This class acts as a bridge between raw data and the Great Expectations 33 validation engine. It delegates datasource management to DatasourceManager, 34 which handles the details of registering pandas or PySpark datasources with 35 the GX context, and constructs the BatchRequest that defines how GX should 36 access and validate the data. 37 38 The BatchManager is typically instantiated by the validate_dataset function 39 and passed to ExpectationAdditionStrategy implementations (which add 40 validation rules) and CheckpointManager (which executes validations and 41 evaluates results). 42 43 Parameters 44 ---------- 45 name : str 46 The logical name identifying this dataset or validation batch. This 47 name is used to identify the data asset within the GX datasource and 48 is typically used as the basis for naming expectation suites (e.g., 49 "{name}_suite"). Must be a valid identifier string. 50 data : Data 51 The dataset to be validated. This can be a pandas DataFrame, PySpark 52 DataFrame, or any other data structure conforming to the Data protocol. 53 The data will be registered with GX as a dataframe asset for validation. 54 data_context : AbstractDataContext 55 The Great Expectations data context that manages datasources, expectation 56 suites, checkpoints, and validation results. This can be an 57 EphemeralDataContext (in-memory, for testing or transient workflows), 58 FileDataContext (persistent, file-based), or cloud-backed contexts 59 (AWS, GCP, Azure). The context provides the validation infrastructure 60 and configuration. 61 62 Attributes 63 ---------- 64 name : str 65 The logical name of the dataset being validated. 66 data : Data 67 The dataset to be validated. 68 data_context : AbstractDataContext 69 The Great Expectations data context managing validation infrastructure. 70 batch_request : BatchRequest 71 The Great Expectations BatchRequest object created during initialization. 72 This request encapsulates the datasource name, data asset name, and 73 dataframe reference needed by GX to access and validate the data. 74 Automatically created by __post_init__ via create_batch_request(). 75 76 See Also 77 -------- 78 DatasourceManager : Manages GX datasource registration for pandas and PySpark data. 79 CheckpointManager : Executes validation checkpoints using BatchManager metadata. 80 validate_dataset : Main validation function that instantiates BatchManager. 81 ExpectationAdditionStrategy : Adds validation expectations using BatchManager. 82 83 Notes 84 ----- 85 The BatchManager uses a dataclass design with automatic initialization via 86 __post_init__. The batch_request field is not part of the constructor 87 signature (field(init=False)) but is automatically created after the main 88 fields are initialized. 89 90 The workflow is as follows: 91 92 1. User calls validate_dataset() with data, name, and strategies 93 2. validate_dataset() creates a BatchManager instance 94 3. BatchManager.__post_init__() calls create_batch_request() 95 4. create_batch_request() delegates to DatasourceManager to register datasource 96 5. A data asset is added to the datasource with the specified name 97 6. A BatchRequest is built referencing the dataframe 98 7. The BatchManager (with batch_request populated) is passed to strategies 99 8. ExpectationAdditionStrategy adds expectations to the suite 100 9. CheckpointManager runs validation and evaluates results 101 102 The BatchManager supports both pandas and PySpark DataFrames through the 103 DatasourceManager abstraction, which automatically detects the dataframe 104 type and registers the appropriate GX datasource (PandasDatasource or 105 SparkDFDatasource). 106 107 Examples 108 -------- 109 Create a BatchManager for a pandas DataFrame with an ephemeral context: 110 111 >>> import pandas as pd 112 >>> from great_expectations.data_context import EphemeralDataContext 113 >>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager 114 >>> data = pd.DataFrame( 115 ... { 116 ... "col1": [1, 2, 3], 117 ... "col2": [4.0, 5.0, 6.0], 118 ... "col3": ["a", "b", "c"], 119 ... } 120 ... ) 121 >>> context = EphemeralDataContext() 122 >>> batch_manager = BatchManager( 123 ... name="my_dataset", 124 ... data=data, 125 ... data_context=context, 126 ... ) 127 >>> print(batch_manager.batch_request.data_asset_name) 128 my_dataset 129 >>> print(batch_manager.batch_request.datasource_name) 130 pandas_datasource 131 132 Use BatchManager within the full validation workflow: 133 134 >>> from adc_toolkit.data.validators.gx.batch_managers.batch_validation import validate_dataset 135 >>> from adc_toolkit.data.validators.gx.batch_managers.expectation_suite_lookup_strategy import ( 136 ... AutoExpectationSuiteCreation, 137 ... ) 138 >>> from adc_toolkit.data.validators.gx.batch_managers.expectation_addition_strategy import ( 139 ... SchemaExpectationAddition, 140 ... ) 141 >>> validated_data = validate_dataset( 142 ... name="my_dataset", 143 ... data=data, 144 ... data_context=context, 145 ... expectation_suite_lookup_strategy=AutoExpectationSuiteCreation(), 146 ... expectation_addition_strategy=SchemaExpectationAddition(), 147 ... ) 148 149 Access the batch request for custom validation logic: 150 151 >>> batch_request = batch_manager.batch_request 152 >>> validator = context.get_validator(batch_request=batch_request, expectation_suite_name="my_dataset_suite") 153 >>> validation_result = validator.validate() 154 """ 155 156 name: str 157 data: Data 158 data_context: AbstractDataContext 159 batch_request: BatchRequest = field(init=False) 160 161 def __post_init__(self) -> None: 162 """ 163 Initialize computed fields after dataclass initialization. 164 165 This method is automatically called by the dataclass machinery after 166 __init__ completes. It creates and populates the batch_request field 167 by calling create_batch_request(). This two-stage initialization allows 168 the batch_request to be automatically computed from the name, data, and 169 data_context fields without requiring explicit initialization by the 170 caller. 171 172 The method ensures that every BatchManager instance has a valid 173 batch_request immediately after construction, ready to be used by 174 downstream validation components. 175 176 Notes 177 ----- 178 This method is called automatically and should not be invoked manually. 179 It is part of the dataclass lifecycle and implements the deferred 180 initialization pattern for computed fields. 181 182 See Also 183 -------- 184 create_batch_request : Creates the BatchRequest for this batch. 185 """ 186 self.batch_request = self.create_batch_request() 187 188 def create_batch_request(self) -> BatchRequest: 189 """ 190 Create a Great Expectations BatchRequest for this validation batch. 191 192 This method orchestrates the creation of a BatchRequest by delegating 193 datasource management to DatasourceManager, adding a dataframe asset to 194 the datasource, and building a batch request that references the data. 195 196 The process involves: 197 198 1. Instantiate DatasourceManager with the data and context 199 2. Add or update the appropriate datasource (pandas or PySpark) in the 200 data context based on the detected dataframe type 201 3. Add a dataframe asset to the datasource with the specified name 202 4. Build and return a BatchRequest linking the data asset to the 203 in-memory dataframe 204 205 The resulting BatchRequest can be used by Great Expectations validators, 206 checkpoints, and other components to access and validate the data. 207 208 Returns 209 ------- 210 BatchRequest 211 A Great Expectations BatchRequest object containing the datasource 212 name (e.g., "pandas_datasource" or "pyspark_datasource"), data 213 asset name (same as self.name), and a reference to the dataframe. 214 This request can be passed to GX validators and checkpoints to 215 execute validations. 216 217 Notes 218 ----- 219 This method is called automatically during __post_init__ and typically 220 should not be called manually. It delegates the complexity of datasource 221 type detection and registration to DatasourceManager, keeping the 222 BatchManager logic clean and focused on coordination. 223 224 The datasource is added or updated (not just retrieved) to ensure it 225 exists in the data context. If a datasource with the same name already 226 exists, GX updates it; otherwise, it creates a new one. 227 228 The dataframe asset is ephemeral and exists only for the duration of 229 this validation batch. Each call to create_batch_request() creates a 230 new asset with the specified name, which may overwrite previous assets 231 with the same name. 232 233 See Also 234 -------- 235 DatasourceManager.add_or_update_datasource : Registers the datasource with GX. 236 237 Examples 238 -------- 239 The create_batch_request method is called automatically, but its behavior 240 can be understood through manual usage: 241 242 >>> import pandas as pd 243 >>> from great_expectations.data_context import EphemeralDataContext 244 >>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager 245 >>> data = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) 246 >>> context = EphemeralDataContext() 247 >>> batch_manager = BatchManager(name="test_data", data=data, data_context=context) 248 >>> # batch_request is automatically created via __post_init__ 249 >>> batch_request = batch_manager.batch_request 250 >>> print(f"Datasource: {batch_request.datasource_name}") 251 Datasource: pandas_datasource 252 >>> print(f"Asset: {batch_request.data_asset_name}") 253 Asset: test_data 254 255 The batch request can be used to get a validator: 256 257 >>> validator = context.get_validator(batch_request=batch_request, expectation_suite_name="test_suite") 258 """ 259 datasource = DatasourceManager(self.data, self.data_context).add_or_update_datasource() 260 data_asset = datasource.add_dataframe_asset(name=self.name) 261 return data_asset.build_batch_request(dataframe=self.data)
Coordinate batch metadata and operations for Great Expectations validation.
The BatchManager dataclass serves as a central coordination point in the Great Expectations (GX) validation workflow. It encapsulates all essential metadata about a validation batch, including the dataset name, the data to be validated, and the GX data context. Upon initialization, it automatically creates a BatchRequest that can be used by downstream validation components (expectation strategies, checkpoint managers) to execute validations.
This class acts as a bridge between raw data and the Great Expectations validation engine. It delegates datasource management to DatasourceManager, which handles the details of registering pandas or PySpark datasources with the GX context, and constructs the BatchRequest that defines how GX should access and validate the data.
The BatchManager is typically instantiated by the validate_dataset function and passed to ExpectationAdditionStrategy implementations (which add validation rules) and CheckpointManager (which executes validations and evaluates results).
Parameters
- name (str): The logical name identifying this dataset or validation batch. This name is used to identify the data asset within the GX datasource and is typically used as the basis for naming expectation suites (e.g., "{name}_suite"). Must be a valid identifier string.
- data (Data): The dataset to be validated. This can be a pandas DataFrame, PySpark DataFrame, or any other data structure conforming to the Data protocol. The data will be registered with GX as a dataframe asset for validation.
- data_context (AbstractDataContext): The Great Expectations data context that manages datasources, expectation suites, checkpoints, and validation results. This can be an EphemeralDataContext (in-memory, for testing or transient workflows), FileDataContext (persistent, file-based), or cloud-backed contexts (AWS, GCP, Azure). The context provides the validation infrastructure and configuration.
Attributes
- name (str): The logical name of the dataset being validated.
- data (Data): The dataset to be validated.
- data_context (AbstractDataContext): The Great Expectations data context managing validation infrastructure.
- batch_request (BatchRequest): The Great Expectations BatchRequest object created during initialization. This request encapsulates the datasource name, data asset name, and dataframe reference needed by GX to access and validate the data. Automatically created by __post_init__ via create_batch_request().
See Also
DatasourceManager: Manages GX datasource registration for pandas and PySpark data.
CheckpointManager: Executes validation checkpoints using BatchManager metadata.
validate_dataset: Main validation function that instantiates BatchManager.
ExpectationAdditionStrategy: Adds validation expectations using BatchManager.
Notes
The BatchManager uses a dataclass design with automatic initialization via __post_init__. The batch_request field is not part of the constructor signature (field(init=False)) but is automatically created after the main fields are initialized.
The workflow is as follows:
- User calls validate_dataset() with data, name, and strategies
- validate_dataset() creates a BatchManager instance
- BatchManager.__post_init__() calls create_batch_request()
- create_batch_request() delegates to DatasourceManager to register datasource
- A data asset is added to the datasource with the specified name
- A BatchRequest is built referencing the dataframe
- The BatchManager (with batch_request populated) is passed to strategies
- ExpectationAdditionStrategy adds expectations to the suite
- CheckpointManager runs validation and evaluates results
The BatchManager supports both pandas and PySpark DataFrames through the DatasourceManager abstraction, which automatically detects the dataframe type and registers the appropriate GX datasource (PandasDatasource or SparkDFDatasource).
Examples
Create a BatchManager for a pandas DataFrame with an ephemeral context:
>>> import pandas as pd
>>> from great_expectations.data_context import EphemeralDataContext
>>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager
>>> data = pd.DataFrame(
... {
... "col1": [1, 2, 3],
... "col2": [4.0, 5.0, 6.0],
... "col3": ["a", "b", "c"],
... }
... )
>>> context = EphemeralDataContext()
>>> batch_manager = BatchManager(
... name="my_dataset",
... data=data,
... data_context=context,
... )
>>> print(batch_manager.batch_request.data_asset_name)
my_dataset
>>> print(batch_manager.batch_request.datasource_name)
pandas_datasource
Use BatchManager within the full validation workflow:
>>> from adc_toolkit.data.validators.gx.batch_managers.batch_validation import validate_dataset
>>> from adc_toolkit.data.validators.gx.batch_managers.expectation_suite_lookup_strategy import (
... AutoExpectationSuiteCreation,
... )
>>> from adc_toolkit.data.validators.gx.batch_managers.expectation_addition_strategy import (
... SchemaExpectationAddition,
... )
>>> validated_data = validate_dataset(
... name="my_dataset",
... data=data,
... data_context=context,
... expectation_suite_lookup_strategy=AutoExpectationSuiteCreation(),
... expectation_addition_strategy=SchemaExpectationAddition(),
... )
Access the batch request for custom validation logic:
>>> batch_request = batch_manager.batch_request
>>> validator = context.get_validator(batch_request=batch_request, expectation_suite_name="my_dataset_suite")
>>> validation_result = validator.validate()
188 def create_batch_request(self) -> BatchRequest: 189 """ 190 Create a Great Expectations BatchRequest for this validation batch. 191 192 This method orchestrates the creation of a BatchRequest by delegating 193 datasource management to DatasourceManager, adding a dataframe asset to 194 the datasource, and building a batch request that references the data. 195 196 The process involves: 197 198 1. Instantiate DatasourceManager with the data and context 199 2. Add or update the appropriate datasource (pandas or PySpark) in the 200 data context based on the detected dataframe type 201 3. Add a dataframe asset to the datasource with the specified name 202 4. Build and return a BatchRequest linking the data asset to the 203 in-memory dataframe 204 205 The resulting BatchRequest can be used by Great Expectations validators, 206 checkpoints, and other components to access and validate the data. 207 208 Returns 209 ------- 210 BatchRequest 211 A Great Expectations BatchRequest object containing the datasource 212 name (e.g., "pandas_datasource" or "pyspark_datasource"), data 213 asset name (same as self.name), and a reference to the dataframe. 214 This request can be passed to GX validators and checkpoints to 215 execute validations. 216 217 Notes 218 ----- 219 This method is called automatically during __post_init__ and typically 220 should not be called manually. It delegates the complexity of datasource 221 type detection and registration to DatasourceManager, keeping the 222 BatchManager logic clean and focused on coordination. 223 224 The datasource is added or updated (not just retrieved) to ensure it 225 exists in the data context. If a datasource with the same name already 226 exists, GX updates it; otherwise, it creates a new one. 227 228 The dataframe asset is ephemeral and exists only for the duration of 229 this validation batch. Each call to create_batch_request() creates a 230 new asset with the specified name, which may overwrite previous assets 231 with the same name. 232 233 See Also 234 -------- 235 DatasourceManager.add_or_update_datasource : Registers the datasource with GX. 236 237 Examples 238 -------- 239 The create_batch_request method is called automatically, but its behavior 240 can be understood through manual usage: 241 242 >>> import pandas as pd 243 >>> from great_expectations.data_context import EphemeralDataContext 244 >>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager 245 >>> data = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}) 246 >>> context = EphemeralDataContext() 247 >>> batch_manager = BatchManager(name="test_data", data=data, data_context=context) 248 >>> # batch_request is automatically created via __post_init__ 249 >>> batch_request = batch_manager.batch_request 250 >>> print(f"Datasource: {batch_request.datasource_name}") 251 Datasource: pandas_datasource 252 >>> print(f"Asset: {batch_request.data_asset_name}") 253 Asset: test_data 254 255 The batch request can be used to get a validator: 256 257 >>> validator = context.get_validator(batch_request=batch_request, expectation_suite_name="test_suite") 258 """ 259 datasource = DatasourceManager(self.data, self.data_context).add_or_update_datasource() 260 data_asset = datasource.add_dataframe_asset(name=self.name) 261 return data_asset.build_batch_request(dataframe=self.data)
Create a Great Expectations BatchRequest for this validation batch.
This method orchestrates the creation of a BatchRequest by delegating datasource management to DatasourceManager, adding a dataframe asset to the datasource, and building a batch request that references the data.
The process involves:
- Instantiate DatasourceManager with the data and context
- Add or update the appropriate datasource (pandas or PySpark) in the data context based on the detected dataframe type
- Add a dataframe asset to the datasource with the specified name
- Build and return a BatchRequest linking the data asset to the in-memory dataframe
The resulting BatchRequest can be used by Great Expectations validators, checkpoints, and other components to access and validate the data.
Returns
- BatchRequest: A Great Expectations BatchRequest object containing the datasource name (e.g., "pandas_datasource" or "pyspark_datasource"), data asset name (same as self.name), and a reference to the dataframe. This request can be passed to GX validators and checkpoints to execute validations.
Notes
This method is called automatically during __post_init__ and typically should not be called manually. It delegates the complexity of datasource type detection and registration to DatasourceManager, keeping the BatchManager logic clean and focused on coordination.
The datasource is added or updated (not just retrieved) to ensure it exists in the data context. If a datasource with the same name already exists, GX updates it; otherwise, it creates a new one.
The dataframe asset is ephemeral and exists only for the duration of this validation batch. Each call to create_batch_request() creates a new asset with the specified name, which may overwrite previous assets with the same name.
See Also
DatasourceManager.add_or_update_datasource: Registers the datasource with GX.
Examples
The create_batch_request method is called automatically, but its behavior can be understood through manual usage:
>>> import pandas as pd
>>> from great_expectations.data_context import EphemeralDataContext
>>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager
>>> data = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
>>> context = EphemeralDataContext()
>>> batch_manager = BatchManager(name="test_data", data=data, data_context=context)
>>> # batch_request is automatically created via __post_init__
>>> batch_request = batch_manager.batch_request
>>> print(f"Datasource: {batch_request.datasource_name}")
Datasource: pandas_datasource
>>> print(f"Asset: {batch_request.data_asset_name}")
Asset: test_data
The batch request can be used to get a validator:
>>> validator = context.get_validator(batch_request=batch_request, expectation_suite_name="test_suite")
12class ConfigurationBasedExpectationAddition: 13 """ 14 Add Great Expectations to a suite from dictionary-based configuration. 15 16 This class implements the ExpectationAddition protocol to add expectations 17 to Great Expectations suites based on structured dictionary configurations. 18 It provides a declarative approach to defining data validation rules without 19 writing code, making it suitable for configuration-driven validation workflows. 20 21 The class processes expectation dictionaries where each dictionary contains 22 a single expectation type as the key and its parameters as the value. It 23 leverages the `parse_expectations_dict` function to extract and validate 24 the expectation structure before creating ExpectationConfiguration objects 25 and adding them to the target suite. 26 27 This implementation is particularly useful for: 28 29 - Loading expectations from YAML or JSON configuration files 30 - Dynamically building validation suites from user-defined configurations 31 - Separating validation logic from code for better maintainability 32 - Enabling non-technical users to define validation rules 33 34 Attributes 35 ---------- 36 None 37 This class is stateless and requires no instance attributes. 38 39 See Also 40 -------- 41 parse_expectations_dict : Parses and validates expectation dictionary structure. 42 ExpectationAddition : Protocol defining the expectation addition interface. 43 BatchManager : Manages Great Expectations batches and data contexts. 44 45 Notes 46 ----- 47 The class expects expectation dictionaries to follow a specific format: 48 49 - Each dictionary must contain exactly one key-value pair 50 - The key is the expectation type (e.g., "expect_column_values_to_be_in_set") 51 - The value is a dictionary of expectation parameters (kwargs) 52 53 The expectation suite is retrieved using the naming convention 54 "{batch_manager.name}_suite" and is automatically updated in the data 55 context after all expectations are added. 56 57 This implementation does not validate whether the expectation types exist 58 in Great Expectations or whether the provided kwargs are valid for the 59 expectation type. Such validation is delegated to Great Expectations itself 60 when the expectations are added to the suite. 61 62 Examples 63 -------- 64 Basic usage with column value expectations: 65 66 >>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager 67 >>> adder = ConfigurationBasedExpectationAddition() 68 >>> expectations = [ 69 ... { 70 ... "expect_column_values_to_be_in_set": { 71 ... "column": "status", 72 ... "value_set": ["active", "inactive", "pending"], 73 ... } 74 ... }, 75 ... { 76 ... "expect_column_values_to_not_be_null": { 77 ... "column": "user_id", 78 ... } 79 ... }, 80 ... ] 81 >>> adder.add_expectations(batch_manager, expectations) 82 83 Configuration with multiple expectation types: 84 85 >>> expectations = [ 86 ... { 87 ... "expect_table_row_count_to_be_between": { 88 ... "min_value": 100, 89 ... "max_value": 10000, 90 ... } 91 ... }, 92 ... { 93 ... "expect_column_mean_to_be_between": { 94 ... "column": "price", 95 ... "min_value": 0.0, 96 ... "max_value": 1000.0, 97 ... } 98 ... }, 99 ... { 100 ... "expect_column_unique_value_count_to_be_between": { 101 ... "column": "product_id", 102 ... "min_value": 50, 103 ... "max_value": 500, 104 ... } 105 ... }, 106 ... ] 107 >>> adder.add_expectations(batch_manager, expectations) 108 109 Loading expectations from a configuration file: 110 111 >>> import yaml 112 >>> with open("expectations.yaml") as f: 113 ... config = yaml.safe_load(f) 114 >>> expectations = config["expectations"] 115 >>> adder = ConfigurationBasedExpectationAddition() 116 >>> adder.add_expectations(batch_manager, expectations) 117 """ 118 119 def add_expectations(self, batch_manager: BatchManager, expectations: list[dict]) -> None: 120 """ 121 Add expectations to the expectation suite from configuration dictionaries. 122 123 Parse each expectation dictionary to extract the expectation type and 124 parameters, create ExpectationConfiguration objects, and add them to 125 the expectation suite associated with the provided batch manager. The 126 updated suite is persisted to the data context. 127 128 This method processes expectations sequentially, ensuring each is properly 129 validated and added before moving to the next. If any expectation dictionary 130 is malformed, the method will raise an exception before adding subsequent 131 expectations. 132 133 Parameters 134 ---------- 135 batch_manager : BatchManager 136 The batch manager containing the data context and batch information. 137 The expectation suite is retrieved using the pattern 138 "{batch_manager.name}_suite" from the batch manager's data context. 139 expectations : list of dict 140 List of expectation dictionaries to add to the suite. Each dictionary 141 must contain exactly one key-value pair where the key is the expectation 142 type (string) and the value is a dictionary of expectation parameters. 143 144 Expected format for each dictionary: 145 { 146 "expectation_type_name": { 147 "param1": value1, 148 "param2": value2, 149 ... 150 } 151 } 152 153 Returns 154 ------- 155 None 156 This method modifies the expectation suite in place and persists 157 changes to the data context but does not return a value. 158 159 Raises 160 ------ 161 InvalidExpectationDictionaryError 162 If any expectation dictionary does not contain exactly one key-value pair. 163 InvalidExpectationNameTypeError 164 If the expectation type (dictionary key) is not a string. 165 InvalidExpectationKwargsTypeError 166 If the expectation parameters (dictionary value) are not a dictionary. 167 168 See Also 169 -------- 170 parse_expectations_dict : Function that validates and extracts expectation 171 components from dictionary format. 172 ExpectationConfiguration : Great Expectations class representing a single 173 expectation with its type and parameters. 174 175 Notes 176 ----- 177 The method performs the following steps for each expectation: 178 179 1. Parse the expectation dictionary using `parse_expectations_dict` 180 2. Create an `ExpectationConfiguration` object with the extracted type and kwargs 181 3. Add the configuration to the expectation suite 182 4. Update the suite in the data context after all expectations are added 183 184 The expectation suite must exist in the data context before calling this 185 method. The suite is identified by the naming convention "{batch_manager.name}_suite". 186 187 All expectations are added to the same suite in a single batch. If you need 188 to add expectations to multiple suites, call this method separately for each 189 batch manager. 190 191 This method does not validate the semantic correctness of expectations 192 (e.g., whether column names exist or parameter values are appropriate). 193 Such validation occurs when Great Expectations evaluates the expectations 194 against actual data. 195 196 Examples 197 -------- 198 Add column validation expectations: 199 200 >>> adder = ConfigurationBasedExpectationAddition() 201 >>> expectations = [ 202 ... { 203 ... "expect_column_values_to_be_in_set": { 204 ... "column": "status", 205 ... "value_set": ["active", "inactive"], 206 ... } 207 ... }, 208 ... { 209 ... "expect_column_values_to_not_be_null": { 210 ... "column": "user_id", 211 ... } 212 ... }, 213 ... ] 214 >>> adder.add_expectations(batch_manager, expectations) 215 216 Add table-level and column-level expectations: 217 218 >>> expectations = [ 219 ... { 220 ... "expect_table_row_count_to_be_between": { 221 ... "min_value": 1000, 222 ... "max_value": 100000, 223 ... } 224 ... }, 225 ... { 226 ... "expect_column_mean_to_be_between": { 227 ... "column": "temperature", 228 ... "min_value": -50.0, 229 ... "max_value": 150.0, 230 ... } 231 ... }, 232 ... ] 233 >>> adder.add_expectations(batch_manager, expectations) 234 235 Add expectations with complex parameter types: 236 237 >>> expectations = [ 238 ... { 239 ... "expect_column_values_to_match_regex": { 240 ... "column": "email", 241 ... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$", 242 ... } 243 ... }, 244 ... { 245 ... "expect_column_pair_values_to_be_equal": { 246 ... "column_A": "expected_total", 247 ... "column_B": "actual_total", 248 ... } 249 ... }, 250 ... ] 251 >>> adder.add_expectations(batch_manager, expectations) 252 """ 253 suite = batch_manager.data_context.get_expectation_suite(expectation_suite_name=f"{batch_manager.name}_suite") 254 for expectation in expectations: 255 expectation_type, expectation_kwargs = parse_expectations_dict(expectation_dictionary=expectation) 256 expectation_configuration = ExpectationConfiguration( 257 expectation_type=expectation_type, 258 kwargs=expectation_kwargs, 259 ) 260 suite.add_expectation(expectation_configuration) 261 batch_manager.data_context.update_expectation_suite(suite)
Add Great Expectations to a suite from dictionary-based configuration.
This class implements the ExpectationAddition protocol to add expectations to Great Expectations suites based on structured dictionary configurations. It provides a declarative approach to defining data validation rules without writing code, making it suitable for configuration-driven validation workflows.
The class processes expectation dictionaries where each dictionary contains
a single expectation type as the key and its parameters as the value. It
leverages the parse_expectations_dict function to extract and validate
the expectation structure before creating ExpectationConfiguration objects
and adding them to the target suite.
This implementation is particularly useful for:
- Loading expectations from YAML or JSON configuration files
- Dynamically building validation suites from user-defined configurations
- Separating validation logic from code for better maintainability
- Enabling non-technical users to define validation rules
Attributes
- None: This class is stateless and requires no instance attributes.
See Also
parse_expectations_dict: Parses and validates expectation dictionary structure.
ExpectationAddition: Protocol defining the expectation addition interface.
BatchManager: Manages Great Expectations batches and data contexts.
Notes
The class expects expectation dictionaries to follow a specific format:
- Each dictionary must contain exactly one key-value pair
- The key is the expectation type (e.g., "expect_column_values_to_be_in_set")
- The value is a dictionary of expectation parameters (kwargs)
The expectation suite is retrieved using the naming convention "{batch_manager.name}_suite" and is automatically updated in the data context after all expectations are added.
This implementation does not validate whether the expectation types exist in Great Expectations or whether the provided kwargs are valid for the expectation type. Such validation is delegated to Great Expectations itself when the expectations are added to the suite.
Examples
Basic usage with column value expectations:
>>> from adc_toolkit.data.validators.gx.batch_managers.batch_manager import BatchManager
>>> adder = ConfigurationBasedExpectationAddition()
>>> expectations = [
... {
... "expect_column_values_to_be_in_set": {
... "column": "status",
... "value_set": ["active", "inactive", "pending"],
... }
... },
... {
... "expect_column_values_to_not_be_null": {
... "column": "user_id",
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
Configuration with multiple expectation types:
>>> expectations = [
... {
... "expect_table_row_count_to_be_between": {
... "min_value": 100,
... "max_value": 10000,
... }
... },
... {
... "expect_column_mean_to_be_between": {
... "column": "price",
... "min_value": 0.0,
... "max_value": 1000.0,
... }
... },
... {
... "expect_column_unique_value_count_to_be_between": {
... "column": "product_id",
... "min_value": 50,
... "max_value": 500,
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
Loading expectations from a configuration file:
>>> import yaml
>>> with open("expectations.yaml") as f:
... config = yaml.safe_load(f)
>>> expectations = config["expectations"]
>>> adder = ConfigurationBasedExpectationAddition()
>>> adder.add_expectations(batch_manager, expectations)
119 def add_expectations(self, batch_manager: BatchManager, expectations: list[dict]) -> None: 120 """ 121 Add expectations to the expectation suite from configuration dictionaries. 122 123 Parse each expectation dictionary to extract the expectation type and 124 parameters, create ExpectationConfiguration objects, and add them to 125 the expectation suite associated with the provided batch manager. The 126 updated suite is persisted to the data context. 127 128 This method processes expectations sequentially, ensuring each is properly 129 validated and added before moving to the next. If any expectation dictionary 130 is malformed, the method will raise an exception before adding subsequent 131 expectations. 132 133 Parameters 134 ---------- 135 batch_manager : BatchManager 136 The batch manager containing the data context and batch information. 137 The expectation suite is retrieved using the pattern 138 "{batch_manager.name}_suite" from the batch manager's data context. 139 expectations : list of dict 140 List of expectation dictionaries to add to the suite. Each dictionary 141 must contain exactly one key-value pair where the key is the expectation 142 type (string) and the value is a dictionary of expectation parameters. 143 144 Expected format for each dictionary: 145 { 146 "expectation_type_name": { 147 "param1": value1, 148 "param2": value2, 149 ... 150 } 151 } 152 153 Returns 154 ------- 155 None 156 This method modifies the expectation suite in place and persists 157 changes to the data context but does not return a value. 158 159 Raises 160 ------ 161 InvalidExpectationDictionaryError 162 If any expectation dictionary does not contain exactly one key-value pair. 163 InvalidExpectationNameTypeError 164 If the expectation type (dictionary key) is not a string. 165 InvalidExpectationKwargsTypeError 166 If the expectation parameters (dictionary value) are not a dictionary. 167 168 See Also 169 -------- 170 parse_expectations_dict : Function that validates and extracts expectation 171 components from dictionary format. 172 ExpectationConfiguration : Great Expectations class representing a single 173 expectation with its type and parameters. 174 175 Notes 176 ----- 177 The method performs the following steps for each expectation: 178 179 1. Parse the expectation dictionary using `parse_expectations_dict` 180 2. Create an `ExpectationConfiguration` object with the extracted type and kwargs 181 3. Add the configuration to the expectation suite 182 4. Update the suite in the data context after all expectations are added 183 184 The expectation suite must exist in the data context before calling this 185 method. The suite is identified by the naming convention "{batch_manager.name}_suite". 186 187 All expectations are added to the same suite in a single batch. If you need 188 to add expectations to multiple suites, call this method separately for each 189 batch manager. 190 191 This method does not validate the semantic correctness of expectations 192 (e.g., whether column names exist or parameter values are appropriate). 193 Such validation occurs when Great Expectations evaluates the expectations 194 against actual data. 195 196 Examples 197 -------- 198 Add column validation expectations: 199 200 >>> adder = ConfigurationBasedExpectationAddition() 201 >>> expectations = [ 202 ... { 203 ... "expect_column_values_to_be_in_set": { 204 ... "column": "status", 205 ... "value_set": ["active", "inactive"], 206 ... } 207 ... }, 208 ... { 209 ... "expect_column_values_to_not_be_null": { 210 ... "column": "user_id", 211 ... } 212 ... }, 213 ... ] 214 >>> adder.add_expectations(batch_manager, expectations) 215 216 Add table-level and column-level expectations: 217 218 >>> expectations = [ 219 ... { 220 ... "expect_table_row_count_to_be_between": { 221 ... "min_value": 1000, 222 ... "max_value": 100000, 223 ... } 224 ... }, 225 ... { 226 ... "expect_column_mean_to_be_between": { 227 ... "column": "temperature", 228 ... "min_value": -50.0, 229 ... "max_value": 150.0, 230 ... } 231 ... }, 232 ... ] 233 >>> adder.add_expectations(batch_manager, expectations) 234 235 Add expectations with complex parameter types: 236 237 >>> expectations = [ 238 ... { 239 ... "expect_column_values_to_match_regex": { 240 ... "column": "email", 241 ... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$", 242 ... } 243 ... }, 244 ... { 245 ... "expect_column_pair_values_to_be_equal": { 246 ... "column_A": "expected_total", 247 ... "column_B": "actual_total", 248 ... } 249 ... }, 250 ... ] 251 >>> adder.add_expectations(batch_manager, expectations) 252 """ 253 suite = batch_manager.data_context.get_expectation_suite(expectation_suite_name=f"{batch_manager.name}_suite") 254 for expectation in expectations: 255 expectation_type, expectation_kwargs = parse_expectations_dict(expectation_dictionary=expectation) 256 expectation_configuration = ExpectationConfiguration( 257 expectation_type=expectation_type, 258 kwargs=expectation_kwargs, 259 ) 260 suite.add_expectation(expectation_configuration) 261 batch_manager.data_context.update_expectation_suite(suite)
Add expectations to the expectation suite from configuration dictionaries.
Parse each expectation dictionary to extract the expectation type and parameters, create ExpectationConfiguration objects, and add them to the expectation suite associated with the provided batch manager. The updated suite is persisted to the data context.
This method processes expectations sequentially, ensuring each is properly validated and added before moving to the next. If any expectation dictionary is malformed, the method will raise an exception before adding subsequent expectations.
Parameters
- batch_manager (BatchManager): The batch manager containing the data context and batch information. The expectation suite is retrieved using the pattern "{batch_manager.name}_suite" from the batch manager's data context.
expectations (list of dict): List of expectation dictionaries to add to the suite. Each dictionary must contain exactly one key-value pair where the key is the expectation type (string) and the value is a dictionary of expectation parameters.
Expected format for each dictionary: { "expectation_type_name": { "param1": value1, "param2": value2, ... } }
Returns
- None: This method modifies the expectation suite in place and persists changes to the data context but does not return a value.
Raises
- InvalidExpectationDictionaryError: If any expectation dictionary does not contain exactly one key-value pair.
- InvalidExpectationNameTypeError: If the expectation type (dictionary key) is not a string.
- InvalidExpectationKwargsTypeError: If the expectation parameters (dictionary value) are not a dictionary.
See Also
parse_expectations_dict: Function that validates and extracts expectation
components from dictionary format.
ExpectationConfiguration: Great Expectations class representing a single
expectation with its type and parameters.
Notes
The method performs the following steps for each expectation:
- Parse the expectation dictionary using
parse_expectations_dict - Create an
ExpectationConfigurationobject with the extracted type and kwargs - Add the configuration to the expectation suite
- Update the suite in the data context after all expectations are added
The expectation suite must exist in the data context before calling this method. The suite is identified by the naming convention "{batch_manager.name}_suite".
All expectations are added to the same suite in a single batch. If you need to add expectations to multiple suites, call this method separately for each batch manager.
This method does not validate the semantic correctness of expectations (e.g., whether column names exist or parameter values are appropriate). Such validation occurs when Great Expectations evaluates the expectations against actual data.
Examples
Add column validation expectations:
>>> adder = ConfigurationBasedExpectationAddition()
>>> expectations = [
... {
... "expect_column_values_to_be_in_set": {
... "column": "status",
... "value_set": ["active", "inactive"],
... }
... },
... {
... "expect_column_values_to_not_be_null": {
... "column": "user_id",
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
Add table-level and column-level expectations:
>>> expectations = [
... {
... "expect_table_row_count_to_be_between": {
... "min_value": 1000,
... "max_value": 100000,
... }
... },
... {
... "expect_column_mean_to_be_between": {
... "column": "temperature",
... "min_value": -50.0,
... "max_value": 150.0,
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
Add expectations with complex parameter types:
>>> expectations = [
... {
... "expect_column_values_to_match_regex": {
... "column": "email",
... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
... }
... },
... {
... "expect_column_pair_values_to_be_equal": {
... "column_A": "expected_total",
... "column_B": "actual_total",
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
84class GXValidator: 85 """ 86 Great Expectations validator implementing the DataValidator protocol. 87 88 This validator provides comprehensive data quality validation using Great 89 Expectations (GX). It orchestrates the complete validation workflow including 90 data context management, expectation suite lookup, expectation creation, 91 batch management, checkpoint execution, and validation result evaluation. 92 93 The validator operates in two modes depending on configuration: 94 95 1. **Auto-creation mode** (default): Automatically creates expectation suites 96 and freezes schemas on first validation. Subsequent validations enforce 97 the captured schema. 98 99 2. **Custom mode**: Uses pre-defined expectation suites and custom strategies 100 for suite lookup and expectation addition, enabling manual control over 101 validation rules. 102 103 Schema freezing captures the structure of a DataFrame (column names and types) 104 and creates expectations that enforce this schema in future validations. This 105 provides automatic protection against schema drift while allowing manual 106 expectation customization when needed. 107 108 Parameters 109 ---------- 110 data_context : great_expectations.data_context.AbstractDataContext 111 Great Expectations data context managing expectation suites, checkpoints, 112 validation results, and data source configurations. This can be a 113 filesystem-based context, cloud-based context (S3, GCS, Azure), or 114 ephemeral in-memory context for testing. 115 expectation_suite_lookup_strategy : ExpectationSuiteLookupStrategy or None, optional 116 Strategy for handling expectation suite lookup when validating datasets. 117 Controls behavior when a suite does not exist for a dataset: 118 119 - AutoExpectationSuiteCreation (default): Automatically creates missing 120 suites, enabling zero-configuration validation. 121 - CustomExpectationSuiteStrategy: Raises ExpectationSuiteNotFoundError 122 for missing suites, enforcing explicit suite definitions. 123 124 Default is None, which uses AutoExpectationSuiteCreation. 125 expectation_addition_strategy : ExpectationAdditionStrategy or None, optional 126 Strategy for adding expectations to expectation suites during validation. 127 Controls how expectations are populated when a suite is created or updated: 128 129 - SchemaExpectationAddition (default): Automatically adds schema 130 expectations by inspecting DataFrame structure, freezing the schema. 131 - SkipExpectationAddition: Skips automatic expectation addition, 132 requiring manual expectation definition. 133 134 Default is None, which uses SchemaExpectationAddition. 135 136 Attributes 137 ---------- 138 data_context : great_expectations.data_context.AbstractDataContext 139 The Great Expectations data context instance used for all validation 140 operations. 141 expectation_suite_lookup_strategy : ExpectationSuiteLookupStrategy 142 The strategy used for expectation suite lookup. 143 expectation_addition_strategy : ExpectationAdditionStrategy 144 The strategy used for adding expectations to suites. 145 146 See Also 147 -------- 148 adc_toolkit.data.abs.DataValidator : Protocol defining validator interface. 149 adc_toolkit.data.validators.gx.data_context.RepoDataContext : Filesystem-based data context. 150 adc_toolkit.data.validators.gx.data_context.S3DataContext : AWS S3-based data context. 151 adc_toolkit.data.validators.gx.data_context.GCPDataContext : Google Cloud Storage context. 152 adc_toolkit.data.validators.gx.data_context.AzureDataContext : Azure Blob Storage context. 153 adc_toolkit.data.validators.gx.batch_managers.AutoExpectationSuiteCreation : Auto-create suites. 154 adc_toolkit.data.validators.gx.batch_managers.CustomExpectationSuiteStrategy : Require suites. 155 adc_toolkit.data.validators.gx.batch_managers.SchemaExpectationAddition : Auto-add schema expectations. 156 adc_toolkit.data.validators.gx.batch_managers.SkipExpectationAddition : Skip expectation addition. 157 158 Notes 159 ----- 160 **Design Patterns:** 161 162 The GXValidator implements several design patterns: 163 164 - **Strategy Pattern**: Pluggable strategies for suite lookup and expectation 165 addition enable flexible validation workflows without modifying core logic. 166 - **Facade Pattern**: Simplifies Great Expectations' complex API by providing 167 a clean, high-level interface for validation. 168 - **Dependency Injection**: Data context and strategies are injected, 169 enabling testability and configuration flexibility. 170 171 **Validation Workflow:** 172 173 When validate() is called, the following sequence occurs: 174 175 1. Look up or create expectation suite for the dataset 176 2. Create a batch from the data using BatchManager 177 3. Add expectations to the suite using the configured strategy 178 4. Create or update a checkpoint for the dataset 179 5. Execute the checkpoint to validate data against expectations 180 6. Evaluate validation results and raise ValidationError on failure 181 7. Return the original data if validation succeeds 182 183 **Performance Considerations:** 184 185 - First validation of a dataset (suite creation) is slower than subsequent 186 validations due to suite initialization overhead. 187 - Schema freezing requires full DataFrame inspection, which scales with 188 the number of columns (not rows). 189 - Validation performance depends on the number and complexity of expectations. 190 - Consider using sampling for large datasets with expensive expectations. 191 192 **Thread Safety:** 193 194 GXValidator instances are not thread-safe. The underlying Great Expectations 195 data context may perform file I/O and maintain internal state. For concurrent 196 validation, create separate validator instances per thread. 197 198 **Cloud Storage:** 199 200 When using cloud-based data contexts (S3, GCS, Azure), ensure appropriate 201 credentials and permissions are configured. The data context stores expectation 202 suites, checkpoints, and validation results in cloud storage. 203 204 Examples 205 -------- 206 Create a validator with default auto-creation behavior: 207 208 >>> from adc_toolkit.data.validators.gx import GXValidator 209 >>> from great_expectations.data_context import EphemeralDataContext 210 >>> import pandas as pd 211 >>> context = EphemeralDataContext() 212 >>> validator = GXValidator(data_context=context) 213 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 214 >>> validated = validator.validate("dataset_name", df) 215 216 Create a validator requiring pre-defined expectation suites: 217 218 >>> from adc_toolkit.data.validators.gx.batch_managers import ( 219 ... CustomExpectationSuiteStrategy, 220 ... SkipExpectationAddition, 221 ... ) 222 >>> validator = GXValidator( 223 ... data_context=context, 224 ... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(), 225 ... expectation_addition_strategy=SkipExpectationAddition(), 226 ... ) 227 >>> # This will raise ExpectationSuiteNotFoundError if suite doesn't exist 228 >>> validated = validator.validate("strict_dataset", df) 229 230 Use with a filesystem-based data context: 231 232 >>> validator = GXValidator.in_directory("/path/to/gx/config") 233 >>> df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]}) 234 >>> validated = validator.validate("my_data", df) 235 236 Validate with automatic schema freezing: 237 238 >>> df_first = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]}) 239 >>> validator.validate("users", df_first) # Creates suite, freezes schema 240 >>> df_second = pd.DataFrame({"id": [3, 4], "name": ["Charlie", "Dave"]}) 241 >>> validator.validate("users", df_second) # Validates against frozen schema 242 >>> df_invalid = pd.DataFrame({"id": [5], "age": [30]}) # Different schema 243 >>> validator.validate("users", df_invalid) # Raises ValidationError 244 245 Integration with ValidatedDataCatalog: 246 247 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 248 >>> catalog = ValidatedDataCatalog.in_directory( 249 ... catalog_dir="config/catalog", validator=GXValidator.in_directory("config/gx") 250 ... ) 251 >>> # Load with automatic validation 252 >>> df = catalog.load("customer_data") 253 >>> # Process data 254 >>> processed = transform(df) 255 >>> # Save with automatic validation 256 >>> catalog.save("processed_customer_data", processed) 257 """ 258 259 __slots__ = [ 260 "data_context", 261 "expectation_addition_strategy", 262 "expectation_suite_lookup_strategy", 263 ] 264 265 def __init__( 266 self, 267 data_context: AbstractDataContext, 268 expectation_suite_lookup_strategy: ExpectationSuiteLookupStrategy | None = None, 269 expectation_addition_strategy: ExpectationAdditionStrategy | None = None, 270 ) -> None: 271 """ 272 Initialize a Great Expectations validator with specified configuration. 273 274 Creates a new GXValidator instance with the provided data context and 275 validation strategies. The data context manages expectation suites, 276 checkpoints, and validation results. The strategies control how the 277 validator handles missing expectation suites and how it populates 278 suites with expectations. 279 280 Parameters 281 ---------- 282 data_context : great_expectations.data_context.AbstractDataContext 283 Great Expectations data context to use for all validation operations. 284 This context manages the storage and retrieval of expectation suites, 285 checkpoints, and validation results. Can be: 286 287 - RepoDataContext: Filesystem-based context stored in a directory 288 - S3DataContext: AWS S3-backed context for cloud deployments 289 - GCPDataContext: Google Cloud Storage-backed context 290 - AzureDataContext: Azure Blob Storage-backed context 291 - EphemeralDataContext: In-memory context for testing 292 293 expectation_suite_lookup_strategy : ExpectationSuiteLookupStrategy or None, optional 294 Strategy for handling expectation suite lookup operations. Controls 295 the behavior when an expectation suite is not found for a dataset: 296 297 - None (default): Uses AutoExpectationSuiteCreation, which 298 automatically creates missing suites with zero configuration. 299 - AutoExpectationSuiteCreation(): Explicitly auto-creates suites. 300 - CustomExpectationSuiteStrategy(): Raises an error for missing 301 suites, enforcing that all suites must be pre-defined. 302 303 Default is None, which is equivalent to AutoExpectationSuiteCreation(). 304 305 expectation_addition_strategy : ExpectationAdditionStrategy or None, optional 306 Strategy for adding expectations to expectation suites. Controls 307 how expectations are populated when validating data: 308 309 - None (default): Uses SchemaExpectationAddition, which inspects 310 DataFrame structure and adds schema validation expectations. 311 - SchemaExpectationAddition(): Explicitly adds schema expectations 312 by freezing the DataFrame's column names and types. 313 - SkipExpectationAddition(): Skips automatic expectation addition, 314 requiring all expectations to be manually defined. 315 316 Default is None, which is equivalent to SchemaExpectationAddition(). 317 318 Returns 319 ------- 320 None 321 322 See Also 323 -------- 324 in_directory : Factory method to create validator from configuration directory. 325 validate : Validate data using this validator. 326 adc_toolkit.data.validators.gx.data_context.RepoDataContext : Create filesystem context. 327 adc_toolkit.data.validators.gx.batch_managers.AutoExpectationSuiteCreation : Auto-create strategy. 328 adc_toolkit.data.validators.gx.batch_managers.SchemaExpectationAddition : Schema freeze strategy. 329 330 Notes 331 ----- 332 The constructor performs minimal initialization, only storing the provided 333 parameters. No I/O operations, file system access, or data context 334 initialization occurs during construction. This enables fast instantiation 335 and lazy initialization patterns. 336 337 **Default Strategies:** 338 339 When strategy parameters are None, the validator uses sensible defaults: 340 341 - AutoExpectationSuiteCreation: Enables zero-configuration validation 342 by automatically creating expectation suites on first use. 343 - SchemaExpectationAddition: Provides automatic schema drift protection 344 by freezing the DataFrame structure on first validation. 345 346 These defaults are ideal for development, exploration, and rapid 347 prototyping. For production deployments with explicit validation rules, 348 consider using CustomExpectationSuiteStrategy and pre-defined suites. 349 350 **Strategy Immutability:** 351 352 Once a validator is instantiated, its strategies cannot be changed. 353 To use different strategies, create a new validator instance. This 354 design ensures consistent validation behavior throughout a validator's 355 lifetime. 356 357 **Data Context Lifecycle:** 358 359 The validator does not own the data context lifecycle. The caller is 360 responsible for creating and properly disposing of the data context. 361 For ephemeral contexts used in testing, ensure proper cleanup: 362 363 >>> context = EphemeralDataContext() 364 >>> try: 365 ... validator = GXValidator(data_context=context) 366 ... # Use validator 367 ... finally: 368 ... # Clean up context if needed 369 ... pass 370 371 Examples 372 -------- 373 Create a validator with default auto-creation strategies: 374 375 >>> from great_expectations.data_context import EphemeralDataContext 376 >>> context = EphemeralDataContext() 377 >>> validator = GXValidator(data_context=context) 378 >>> # Automatically creates suites and freezes schemas 379 380 Create a validator with strict, manual suite management: 381 382 >>> from adc_toolkit.data.validators.gx.batch_managers import ( 383 ... CustomExpectationSuiteStrategy, 384 ... SkipExpectationAddition, 385 ... ) 386 >>> validator = GXValidator( 387 ... data_context=context, 388 ... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(), 389 ... expectation_addition_strategy=SkipExpectationAddition(), 390 ... ) 391 >>> # Requires pre-defined suites, no automatic expectations 392 393 Create a validator with auto-creation but manual expectations: 394 395 >>> validator = GXValidator( 396 ... data_context=context, 397 ... expectation_suite_lookup_strategy=AutoExpectationSuiteCreation(), 398 ... expectation_addition_strategy=SkipExpectationAddition(), 399 ... ) 400 >>> # Creates suites automatically but expects manual expectation definition 401 402 Use with a filesystem-based data context: 403 404 >>> from adc_toolkit.data.validators.gx.data_context import RepoDataContext 405 >>> context = RepoDataContext("/path/to/gx").create() 406 >>> validator = GXValidator(data_context=context) 407 408 Use with a cloud-based data context: 409 410 >>> from adc_toolkit.data.validators.gx.data_context import S3DataContext 411 >>> context = S3DataContext("s3://my-bucket/gx-config").create() 412 >>> validator = GXValidator(data_context=context) 413 """ 414 self.data_context = data_context 415 self.expectation_suite_lookup_strategy = expectation_suite_lookup_strategy or AutoExpectationSuiteCreation() 416 self.expectation_addition_strategy = expectation_addition_strategy or SchemaExpectationAddition() 417 418 @classmethod 419 def in_directory(cls, path: str | Path) -> "GXValidator": 420 """ 421 Create a GXValidator with a filesystem-based Great Expectations data context. 422 423 This factory method provides a convenient way to create a validator using 424 a repository-based (filesystem) data context. It initializes a RepoDataContext 425 from the specified directory and creates a validator with default strategies 426 for auto-creation and schema freezing. 427 428 The specified directory should contain a Great Expectations project structure 429 with configuration files, expectation suites, checkpoints, and validation 430 results. If the directory does not contain a valid GX project, the 431 RepoDataContext will initialize a new project structure. 432 433 Parameters 434 ---------- 435 path : str or pathlib.Path 436 Path to the directory containing Great Expectations configuration. 437 This directory should have (or will be initialized with) the 438 following structure: 439 440 - great_expectations.yml : Main configuration file 441 - expectations/ : Directory containing expectation suite JSON files 442 - checkpoints/ : Directory containing checkpoint YAML files 443 - uncommitted/ : Directory for validation results and data docs 444 - plugins/ : Optional directory for custom expectations 445 446 If the directory does not exist or is empty, a new GX project 447 structure will be created. Both absolute and relative paths are 448 supported. 449 450 Returns 451 ------- 452 GXValidator 453 A new GXValidator instance configured with: 454 455 - RepoDataContext based on the specified directory 456 - AutoExpectationSuiteCreation strategy (creates suites automatically) 457 - SchemaExpectationAddition strategy (freezes schemas automatically) 458 459 Raises 460 ------ 461 FileNotFoundError 462 If the parent directory of the specified path does not exist 463 and cannot be created. 464 PermissionError 465 If the process lacks permissions to read from or write to the 466 specified directory. 467 ValueError 468 If the directory contains invalid Great Expectations configuration 469 files that cannot be parsed. 470 471 See Also 472 -------- 473 __init__ : Constructor for custom data context and strategy configuration. 474 validate : Validate data using this validator. 475 adc_toolkit.data.validators.gx.data_context.RepoDataContext : Filesystem context implementation. 476 adc_toolkit.data.validators.gx.data_context.S3DataContext : AWS S3-based context. 477 adc_toolkit.data.validators.gx.data_context.GCPDataContext : Google Cloud context. 478 479 Notes 480 ----- 481 **Repository Structure:** 482 483 Great Expectations uses a specific directory structure to organize 484 validation artifacts: 485 486 - Expectation suites are stored as JSON in expectations/ 487 - Checkpoints are stored as YAML in checkpoints/ 488 - Validation results go in uncommitted/validations/ 489 - Data docs are generated in uncommitted/data_docs/ 490 491 This structure enables version control of validation rules while keeping 492 validation results and documentation out of version control. 493 494 **Version Control:** 495 496 When using filesystem-based contexts, consider the following for version 497 control (Git): 498 499 - Commit: expectations/, checkpoints/, great_expectations.yml, plugins/ 500 - Ignore: uncommitted/ (contains validation results and generated docs) 501 502 This approach version controls validation rules while excluding 503 environment-specific results. 504 505 **Performance:** 506 507 The in_directory method performs I/O operations to read configuration 508 and initialize the data context. For applications creating many validator 509 instances, consider caching the data context and passing it to __init__ 510 instead of using in_directory repeatedly. 511 512 **Automatic Initialization:** 513 514 If the specified directory does not contain a great_expectations.yml file, 515 RepoDataContext will initialize a new GX project. This is useful for 516 quickly starting validation without manual GX project setup, but may not 517 be suitable for production deployments where explicit configuration is 518 preferred. 519 520 **Default Strategies:** 521 522 This factory method always uses default strategies (AutoExpectationSuiteCreation 523 and SchemaExpectationAddition). For custom strategies, use the __init__ 524 constructor directly: 525 526 >>> from adc_toolkit.data.validators.gx.data_context import RepoDataContext 527 >>> context = RepoDataContext(path).create() 528 >>> validator = GXValidator(data_context=context, expectation_suite_lookup_strategy=CustomStrategy()) 529 530 Examples 531 -------- 532 Create a validator from a GX project directory: 533 534 >>> validator = GXValidator.in_directory("/path/to/gx_project") 535 >>> import pandas as pd 536 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 537 >>> validated = validator.validate("my_dataset", df) 538 539 Use with a relative path: 540 541 >>> validator = GXValidator.in_directory("config/validations") 542 >>> validated = validator.validate("dataset", data) 543 544 Use with pathlib.Path: 545 546 >>> from pathlib import Path 547 >>> config_path = Path("config") / "gx" 548 >>> validator = GXValidator.in_directory(config_path) 549 550 Initialize a new GX project and validator: 551 552 >>> # Directory doesn't exist yet 553 >>> validator = GXValidator.in_directory("./new_gx_project") 554 >>> # Now directory contains initialized GX project structure 555 556 Validate multiple datasets with one validator: 557 558 >>> validator = GXValidator.in_directory("config/gx") 559 >>> df1 = pd.DataFrame({"a": [1, 2]}) 560 >>> df2 = pd.DataFrame({"b": ["x", "y"]}) 561 >>> validated1 = validator.validate("dataset1", df1) 562 >>> validated2 = validator.validate("dataset2", df2) 563 564 Integration in a data pipeline: 565 566 >>> def validate_pipeline_data(data_path: str, gx_path: str) -> None: 567 ... validator = GXValidator.in_directory(gx_path) 568 ... for dataset_name in ["raw", "cleaned", "features"]: 569 ... df = pd.read_csv(f"{data_path}/{dataset_name}.csv") 570 ... validated = validator.validate(dataset_name, df) 571 ... print(f"Validated {dataset_name}: {len(validated)} rows") 572 """ 573 return cls(data_context=RepoDataContext(path).create()) 574 575 def validate(self, name: str, data: Data) -> Data: 576 """ 577 Validate data against Great Expectations rules for the named dataset. 578 579 Executes the complete Great Expectations validation workflow for the 580 specified dataset. This includes expectation suite lookup or creation, 581 batch request generation, expectation addition, checkpoint creation and 582 execution, and validation result evaluation. 583 584 The validation process ensures data quality by verifying that the data 585 meets all expectations defined in the associated expectation suite. If 586 validation fails, detailed error information identifies which expectations 587 failed and why. 588 589 On successful validation, the original data is returned unchanged. The 590 validation is side-effect free from the data perspective, but may create 591 or update expectation suites, checkpoints, and validation results in the 592 data context storage. 593 594 Parameters 595 ---------- 596 name : str 597 Identifier for the dataset being validated. This name is used to: 598 599 - Look up the corresponding expectation suite (named "{name}_suite") 600 - Create or update the checkpoint for this dataset 601 - Store validation results associated with this dataset 602 603 The name should be consistent across validation calls for the same 604 logical dataset to ensure proper suite reuse and result tracking. 605 Use descriptive, stable names like "customer_data", "sales_features", 606 or "model_predictions". 607 608 data : Data 609 The dataset to validate. Must be a Data protocol-compatible object, 610 typically a pandas DataFrame or Spark DataFrame. The data should 611 have `columns` and `dtypes` properties for schema inspection. 612 613 The data is not modified by validation. If validation succeeds, 614 the same object (or an equivalent copy) is returned. 615 616 Returns 617 ------- 618 Data 619 The validated data. This is the same object as the input `data` 620 parameter if validation succeeds. The return type matches the input 621 type (e.g., pandas.DataFrame returns pandas.DataFrame). 622 623 Returning the data enables method chaining and integration with 624 pipelines: 625 626 >>> validated = validator.validate("data", raw_data) 627 >>> processed = transform(validated) 628 629 Raises 630 ------ 631 ValidationError 632 If the data fails validation against the expectation suite. The 633 exception contains detailed information about: 634 635 - Which expectations failed 636 - Observed values that violated expectations 637 - Expected values or constraints 638 - Summary statistics for failed validations 639 640 This exception indicates data quality issues that must be addressed 641 before proceeding with downstream processing. 642 643 ExpectationSuiteNotFoundError 644 If the expectation suite for the dataset does not exist and the 645 validator is configured with CustomExpectationSuiteStrategy. This 646 indicates that validation rules must be explicitly defined before 647 validation can proceed. 648 649 To resolve, either: 650 - Create the expectation suite manually in the data context 651 - Switch to AutoExpectationSuiteCreation strategy 652 - Ensure the correct data context is being used 653 654 TypeError 655 If the data type is incompatible with Great Expectations batch 656 creation. For example, if the data does not have the required 657 `columns` and `dtypes` attributes. 658 659 KeyError 660 If the batch manager cannot create a batch from the data due to 661 missing required attributes or metadata. 662 663 See Also 664 -------- 665 __init__ : Constructor for configuring validation strategies. 666 in_directory : Factory method for filesystem-based validators. 667 adc_toolkit.data.validators.gx.batch_managers.validate_dataset : Underlying validation function. 668 adc_toolkit.data.abs.DataValidator.validate : Protocol method specification. 669 670 Notes 671 ----- 672 **Validation Workflow:** 673 674 The validate method orchestrates these steps: 675 676 1. **Suite Lookup**: Check if an expectation suite exists for the dataset. 677 If not, behavior depends on the lookup strategy: 678 679 - AutoExpectationSuiteCreation: Create a new suite 680 - CustomExpectationSuiteStrategy: Raise ExpectationSuiteNotFoundError 681 682 2. **Batch Creation**: Convert the data into a GX Batch object using 683 BatchManager, making it compatible with GX validation operations. 684 685 3. **Expectation Addition**: Add expectations to the suite based on the 686 addition strategy: 687 688 - SchemaExpectationAddition: Inspect data schema and add schema expectations 689 - SkipExpectationAddition: Skip, expecting manual expectation definition 690 691 4. **Checkpoint Execution**: Create or update a checkpoint for the dataset 692 and execute it to validate the batch against the expectation suite. 693 694 5. **Result Evaluation**: Analyze validation results. If all expectations 695 pass, return the data. If any fail, raise ValidationError with details. 696 697 **First Validation vs. Subsequent Validations:** 698 699 The first time a dataset is validated (with AutoExpectationSuiteCreation 700 and SchemaExpectationAddition), the validator: 701 702 - Creates an expectation suite named "{name}_suite" 703 - Inspects the DataFrame schema (column names and types) 704 - Adds schema expectations that "freeze" this structure 705 - Creates a checkpoint for the dataset 706 - Validates the data (which should pass since expectations match the data) 707 708 Subsequent validations of the same dataset: 709 710 - Reuse the existing expectation suite and checkpoint 711 - Validate data against the frozen schema and any other expectations 712 - Detect schema drift or data quality issues 713 714 **Performance:** 715 716 Validation performance depends on several factors: 717 718 - Number of expectations in the suite 719 - Complexity of expectations (simple schema checks vs. statistical tests) 720 - Size of the dataset (some expectations scan all data) 721 - Data context backend (filesystem vs. cloud storage) 722 723 First validation is slower due to suite and checkpoint creation overhead. 724 Subsequent validations are faster, typically scaling with the number of 725 expectations rather than data size. 726 727 For large datasets with expensive expectations, consider: 728 - Sampling strategies to validate subsets 729 - Caching validation results 730 - Running validations asynchronously 731 - Using incremental validation for streaming data 732 733 **Idempotency:** 734 735 Validation is idempotent: validating the same data multiple times with 736 the same name produces the same result (pass or fail). However, validation 737 results are stored with timestamps, so each validation creates new result 738 artifacts in the data context. 739 740 **Thread Safety:** 741 742 The validate method is not thread-safe. Multiple threads validating 743 different datasets concurrently may encounter race conditions when 744 accessing the data context. For concurrent validation, create separate 745 validator instances (with separate data contexts) per thread. 746 747 **Side Effects:** 748 749 While validation does not modify the data, it may have side effects: 750 751 - Create or update expectation suites in the data context 752 - Create or update checkpoints in the data context 753 - Write validation results to storage (filesystem or cloud) 754 - Generate data documentation if configured 755 756 These artifacts are stored according to the data context configuration. 757 758 Examples 759 -------- 760 Basic validation with automatic suite creation: 761 762 >>> import pandas as pd 763 >>> from adc_toolkit.data.validators.gx import GXValidator 764 >>> validator = GXValidator.in_directory("config/gx") 765 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 766 >>> validated = validator.validate("sales_data", df) 767 >>> # First validation creates suite and freezes schema 768 >>> validated.shape 769 (3, 2) 770 771 Subsequent validation detects schema drift: 772 773 >>> df_valid = pd.DataFrame({"id": [4, 5], "value": [40, 50]}) 774 >>> validator.validate("sales_data", df_valid) # Passes, schema matches 775 >>> df_invalid = pd.DataFrame({"id": [6], "price": [100]}) 776 >>> validator.validate("sales_data", df_invalid) # Raises ValidationError 777 778 Handle validation failures gracefully: 779 780 >>> try: 781 ... validated = validator.validate("strict_data", df) 782 ... except ValidationError as e: 783 ... print(f"Validation failed: {e}") 784 ... # Log error, send alert, reject data, etc. 785 ... raise 786 787 Validate multiple datasets in a pipeline: 788 789 >>> def etl_pipeline(validator: GXValidator) -> None: 790 ... raw = load_raw_data() 791 ... validated_raw = validator.validate("raw_data", raw) 792 ... cleaned = clean(validated_raw) 793 ... validated_clean = validator.validate("cleaned_data", cleaned) 794 ... features = engineer_features(validated_clean) 795 ... validated_features = validator.validate("features", features) 796 ... save(validated_features) 797 798 Use validation in data loading: 799 800 >>> class ValidatedDataLoader: 801 ... def __init__(self, validator: GXValidator): 802 ... self.validator = validator 803 ... 804 ... def load(self, name: str, path: str) -> pd.DataFrame: 805 ... df = pd.read_csv(path) 806 ... return self.validator.validate(name, df) 807 808 Integration with ValidatedDataCatalog: 809 810 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 811 >>> catalog = ValidatedDataCatalog.in_directory( 812 ... "config/catalog", validator=GXValidator.in_directory("config/gx") 813 ... ) 814 >>> # Validation happens automatically on load 815 >>> df = catalog.load("customer_data") # Validates after loading 816 >>> processed = transform(df) 817 >>> catalog.save("processed_data", processed) # Validates before saving 818 819 Validate with custom expectation suite: 820 821 >>> # Pre-create suite with custom expectations 822 >>> suite = context.create_expectation_suite("custom_data_suite") 823 >>> suite.add_expectation( 824 ... ExpectationConfiguration( 825 ... expectation_type="expect_column_values_to_be_in_range", 826 ... kwargs={"column": "age", "min_value": 0, "max_value": 120}, 827 ... ) 828 ... ) 829 >>> # Now validate using the custom suite 830 >>> df = pd.DataFrame({"age": [25, 30, 35]}) 831 >>> validator.validate("custom_data", df) # Uses custom_data_suite 832 """ 833 return validate_dataset( 834 name, 835 data, 836 self.data_context, 837 self.expectation_suite_lookup_strategy, 838 self.expectation_addition_strategy, 839 )
Great Expectations validator implementing the DataValidator protocol.
This validator provides comprehensive data quality validation using Great Expectations (GX). It orchestrates the complete validation workflow including data context management, expectation suite lookup, expectation creation, batch management, checkpoint execution, and validation result evaluation.
The validator operates in two modes depending on configuration:
Auto-creation mode (default): Automatically creates expectation suites and freezes schemas on first validation. Subsequent validations enforce the captured schema.
Custom mode: Uses pre-defined expectation suites and custom strategies for suite lookup and expectation addition, enabling manual control over validation rules.
Schema freezing captures the structure of a DataFrame (column names and types) and creates expectations that enforce this schema in future validations. This provides automatic protection against schema drift while allowing manual expectation customization when needed.
Parameters
- data_context (great_expectations.data_context.AbstractDataContext): Great Expectations data context managing expectation suites, checkpoints, validation results, and data source configurations. This can be a filesystem-based context, cloud-based context (S3, GCS, Azure), or ephemeral in-memory context for testing.
expectation_suite_lookup_strategy (ExpectationSuiteLookupStrategy or None, optional): Strategy for handling expectation suite lookup when validating datasets. Controls behavior when a suite does not exist for a dataset:
- AutoExpectationSuiteCreation (default): Automatically creates missing suites, enabling zero-configuration validation.
- CustomExpectationSuiteStrategy: Raises ExpectationSuiteNotFoundError for missing suites, enforcing explicit suite definitions.
Default is None, which uses AutoExpectationSuiteCreation.
expectation_addition_strategy (ExpectationAdditionStrategy or None, optional): Strategy for adding expectations to expectation suites during validation. Controls how expectations are populated when a suite is created or updated:
- SchemaExpectationAddition (default): Automatically adds schema expectations by inspecting DataFrame structure, freezing the schema.
- SkipExpectationAddition: Skips automatic expectation addition, requiring manual expectation definition.
Default is None, which uses SchemaExpectationAddition.
Attributes
- data_context (great_expectations.data_context.AbstractDataContext): The Great Expectations data context instance used for all validation operations.
- expectation_suite_lookup_strategy (ExpectationSuiteLookupStrategy): The strategy used for expectation suite lookup.
- expectation_addition_strategy (ExpectationAdditionStrategy): The strategy used for adding expectations to suites.
See Also
adc_toolkit.data.abs.DataValidator: Protocol defining validator interface.
adc_toolkit.data.validators.gx.data_context.RepoDataContext: Filesystem-based data context.
adc_toolkit.data.validators.gx.data_context.S3DataContext: AWS S3-based data context.
adc_toolkit.data.validators.gx.data_context.GCPDataContext: Google Cloud Storage context.
adc_toolkit.data.validators.gx.data_context.AzureDataContext: Azure Blob Storage context.
adc_toolkit.data.validators.gx.batch_managers.AutoExpectationSuiteCreation: Auto-create suites.
adc_toolkit.data.validators.gx.batch_managers.CustomExpectationSuiteStrategy: Require suites.
adc_toolkit.data.validators.gx.batch_managers.SchemaExpectationAddition: Auto-add schema expectations.
adc_toolkit.data.validators.gx.batch_managers.SkipExpectationAddition: Skip expectation addition.
Notes
Design Patterns:
The GXValidator implements several design patterns:
- Strategy Pattern: Pluggable strategies for suite lookup and expectation addition enable flexible validation workflows without modifying core logic.
- Facade Pattern: Simplifies Great Expectations' complex API by providing a clean, high-level interface for validation.
- Dependency Injection: Data context and strategies are injected, enabling testability and configuration flexibility.
Validation Workflow:
When validate() is called, the following sequence occurs:
- Look up or create expectation suite for the dataset
- Create a batch from the data using BatchManager
- Add expectations to the suite using the configured strategy
- Create or update a checkpoint for the dataset
- Execute the checkpoint to validate data against expectations
- Evaluate validation results and raise ValidationError on failure
- Return the original data if validation succeeds
Performance Considerations:
- First validation of a dataset (suite creation) is slower than subsequent validations due to suite initialization overhead.
- Schema freezing requires full DataFrame inspection, which scales with the number of columns (not rows).
- Validation performance depends on the number and complexity of expectations.
- Consider using sampling for large datasets with expensive expectations.
Thread Safety:
GXValidator instances are not thread-safe. The underlying Great Expectations data context may perform file I/O and maintain internal state. For concurrent validation, create separate validator instances per thread.
Cloud Storage:
When using cloud-based data contexts (S3, GCS, Azure), ensure appropriate credentials and permissions are configured. The data context stores expectation suites, checkpoints, and validation results in cloud storage.
Examples
Create a validator with default auto-creation behavior:
>>> from adc_toolkit.data.validators.gx import GXValidator
>>> from great_expectations.data_context import EphemeralDataContext
>>> import pandas as pd
>>> context = EphemeralDataContext()
>>> validator = GXValidator(data_context=context)
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
>>> validated = validator.validate("dataset_name", df)
Create a validator requiring pre-defined expectation suites:
>>> from adc_toolkit.data.validators.gx.batch_managers import (
... CustomExpectationSuiteStrategy,
... SkipExpectationAddition,
... )
>>> validator = GXValidator(
... data_context=context,
... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(),
... expectation_addition_strategy=SkipExpectationAddition(),
... )
>>> # This will raise ExpectationSuiteNotFoundError if suite doesn't exist
>>> validated = validator.validate("strict_dataset", df)
Use with a filesystem-based data context:
>>> validator = GXValidator.in_directory("/path/to/gx/config")
>>> df = pd.DataFrame({"col1": [1, 2], "col2": ["a", "b"]})
>>> validated = validator.validate("my_data", df)
Validate with automatic schema freezing:
>>> df_first = pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
>>> validator.validate("users", df_first) # Creates suite, freezes schema
>>> df_second = pd.DataFrame({"id": [3, 4], "name": ["Charlie", "Dave"]})
>>> validator.validate("users", df_second) # Validates against frozen schema
>>> df_invalid = pd.DataFrame({"id": [5], "age": [30]}) # Different schema
>>> validator.validate("users", df_invalid) # Raises ValidationError
Integration with ValidatedDataCatalog:
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory(
... catalog_dir="config/catalog", validator=GXValidator.in_directory("config/gx")
... )
>>> # Load with automatic validation
>>> df = catalog.load("customer_data")
>>> # Process data
>>> processed = transform(df)
>>> # Save with automatic validation
>>> catalog.save("processed_customer_data", processed)
265 def __init__( 266 self, 267 data_context: AbstractDataContext, 268 expectation_suite_lookup_strategy: ExpectationSuiteLookupStrategy | None = None, 269 expectation_addition_strategy: ExpectationAdditionStrategy | None = None, 270 ) -> None: 271 """ 272 Initialize a Great Expectations validator with specified configuration. 273 274 Creates a new GXValidator instance with the provided data context and 275 validation strategies. The data context manages expectation suites, 276 checkpoints, and validation results. The strategies control how the 277 validator handles missing expectation suites and how it populates 278 suites with expectations. 279 280 Parameters 281 ---------- 282 data_context : great_expectations.data_context.AbstractDataContext 283 Great Expectations data context to use for all validation operations. 284 This context manages the storage and retrieval of expectation suites, 285 checkpoints, and validation results. Can be: 286 287 - RepoDataContext: Filesystem-based context stored in a directory 288 - S3DataContext: AWS S3-backed context for cloud deployments 289 - GCPDataContext: Google Cloud Storage-backed context 290 - AzureDataContext: Azure Blob Storage-backed context 291 - EphemeralDataContext: In-memory context for testing 292 293 expectation_suite_lookup_strategy : ExpectationSuiteLookupStrategy or None, optional 294 Strategy for handling expectation suite lookup operations. Controls 295 the behavior when an expectation suite is not found for a dataset: 296 297 - None (default): Uses AutoExpectationSuiteCreation, which 298 automatically creates missing suites with zero configuration. 299 - AutoExpectationSuiteCreation(): Explicitly auto-creates suites. 300 - CustomExpectationSuiteStrategy(): Raises an error for missing 301 suites, enforcing that all suites must be pre-defined. 302 303 Default is None, which is equivalent to AutoExpectationSuiteCreation(). 304 305 expectation_addition_strategy : ExpectationAdditionStrategy or None, optional 306 Strategy for adding expectations to expectation suites. Controls 307 how expectations are populated when validating data: 308 309 - None (default): Uses SchemaExpectationAddition, which inspects 310 DataFrame structure and adds schema validation expectations. 311 - SchemaExpectationAddition(): Explicitly adds schema expectations 312 by freezing the DataFrame's column names and types. 313 - SkipExpectationAddition(): Skips automatic expectation addition, 314 requiring all expectations to be manually defined. 315 316 Default is None, which is equivalent to SchemaExpectationAddition(). 317 318 Returns 319 ------- 320 None 321 322 See Also 323 -------- 324 in_directory : Factory method to create validator from configuration directory. 325 validate : Validate data using this validator. 326 adc_toolkit.data.validators.gx.data_context.RepoDataContext : Create filesystem context. 327 adc_toolkit.data.validators.gx.batch_managers.AutoExpectationSuiteCreation : Auto-create strategy. 328 adc_toolkit.data.validators.gx.batch_managers.SchemaExpectationAddition : Schema freeze strategy. 329 330 Notes 331 ----- 332 The constructor performs minimal initialization, only storing the provided 333 parameters. No I/O operations, file system access, or data context 334 initialization occurs during construction. This enables fast instantiation 335 and lazy initialization patterns. 336 337 **Default Strategies:** 338 339 When strategy parameters are None, the validator uses sensible defaults: 340 341 - AutoExpectationSuiteCreation: Enables zero-configuration validation 342 by automatically creating expectation suites on first use. 343 - SchemaExpectationAddition: Provides automatic schema drift protection 344 by freezing the DataFrame structure on first validation. 345 346 These defaults are ideal for development, exploration, and rapid 347 prototyping. For production deployments with explicit validation rules, 348 consider using CustomExpectationSuiteStrategy and pre-defined suites. 349 350 **Strategy Immutability:** 351 352 Once a validator is instantiated, its strategies cannot be changed. 353 To use different strategies, create a new validator instance. This 354 design ensures consistent validation behavior throughout a validator's 355 lifetime. 356 357 **Data Context Lifecycle:** 358 359 The validator does not own the data context lifecycle. The caller is 360 responsible for creating and properly disposing of the data context. 361 For ephemeral contexts used in testing, ensure proper cleanup: 362 363 >>> context = EphemeralDataContext() 364 >>> try: 365 ... validator = GXValidator(data_context=context) 366 ... # Use validator 367 ... finally: 368 ... # Clean up context if needed 369 ... pass 370 371 Examples 372 -------- 373 Create a validator with default auto-creation strategies: 374 375 >>> from great_expectations.data_context import EphemeralDataContext 376 >>> context = EphemeralDataContext() 377 >>> validator = GXValidator(data_context=context) 378 >>> # Automatically creates suites and freezes schemas 379 380 Create a validator with strict, manual suite management: 381 382 >>> from adc_toolkit.data.validators.gx.batch_managers import ( 383 ... CustomExpectationSuiteStrategy, 384 ... SkipExpectationAddition, 385 ... ) 386 >>> validator = GXValidator( 387 ... data_context=context, 388 ... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(), 389 ... expectation_addition_strategy=SkipExpectationAddition(), 390 ... ) 391 >>> # Requires pre-defined suites, no automatic expectations 392 393 Create a validator with auto-creation but manual expectations: 394 395 >>> validator = GXValidator( 396 ... data_context=context, 397 ... expectation_suite_lookup_strategy=AutoExpectationSuiteCreation(), 398 ... expectation_addition_strategy=SkipExpectationAddition(), 399 ... ) 400 >>> # Creates suites automatically but expects manual expectation definition 401 402 Use with a filesystem-based data context: 403 404 >>> from adc_toolkit.data.validators.gx.data_context import RepoDataContext 405 >>> context = RepoDataContext("/path/to/gx").create() 406 >>> validator = GXValidator(data_context=context) 407 408 Use with a cloud-based data context: 409 410 >>> from adc_toolkit.data.validators.gx.data_context import S3DataContext 411 >>> context = S3DataContext("s3://my-bucket/gx-config").create() 412 >>> validator = GXValidator(data_context=context) 413 """ 414 self.data_context = data_context 415 self.expectation_suite_lookup_strategy = expectation_suite_lookup_strategy or AutoExpectationSuiteCreation() 416 self.expectation_addition_strategy = expectation_addition_strategy or SchemaExpectationAddition()
Initialize a Great Expectations validator with specified configuration.
Creates a new GXValidator instance with the provided data context and validation strategies. The data context manages expectation suites, checkpoints, and validation results. The strategies control how the validator handles missing expectation suites and how it populates suites with expectations.
Parameters
data_context (great_expectations.data_context.AbstractDataContext): Great Expectations data context to use for all validation operations. This context manages the storage and retrieval of expectation suites, checkpoints, and validation results. Can be:
- RepoDataContext: Filesystem-based context stored in a directory
- S3DataContext: AWS S3-backed context for cloud deployments
- GCPDataContext: Google Cloud Storage-backed context
- AzureDataContext: Azure Blob Storage-backed context
- EphemeralDataContext: In-memory context for testing
expectation_suite_lookup_strategy (ExpectationSuiteLookupStrategy or None, optional): Strategy for handling expectation suite lookup operations. Controls the behavior when an expectation suite is not found for a dataset:
- None (default): Uses AutoExpectationSuiteCreation, which automatically creates missing suites with zero configuration.
- AutoExpectationSuiteCreation(): Explicitly auto-creates suites.
- CustomExpectationSuiteStrategy(): Raises an error for missing suites, enforcing that all suites must be pre-defined.
Default is None, which is equivalent to AutoExpectationSuiteCreation().
expectation_addition_strategy (ExpectationAdditionStrategy or None, optional): Strategy for adding expectations to expectation suites. Controls how expectations are populated when validating data:
- None (default): Uses SchemaExpectationAddition, which inspects DataFrame structure and adds schema validation expectations.
- SchemaExpectationAddition(): Explicitly adds schema expectations by freezing the DataFrame's column names and types.
- SkipExpectationAddition(): Skips automatic expectation addition, requiring all expectations to be manually defined.
Default is None, which is equivalent to SchemaExpectationAddition().
Returns
- None
See Also
in_directory: Factory method to create validator from configuration directory.
validate: Validate data using this validator.
adc_toolkit.data.validators.gx.data_context.RepoDataContext: Create filesystem context.
adc_toolkit.data.validators.gx.batch_managers.AutoExpectationSuiteCreation: Auto-create strategy.
adc_toolkit.data.validators.gx.batch_managers.SchemaExpectationAddition: Schema freeze strategy.
Notes
The constructor performs minimal initialization, only storing the provided parameters. No I/O operations, file system access, or data context initialization occurs during construction. This enables fast instantiation and lazy initialization patterns.
Default Strategies:
When strategy parameters are None, the validator uses sensible defaults:
- AutoExpectationSuiteCreation: Enables zero-configuration validation by automatically creating expectation suites on first use.
- SchemaExpectationAddition: Provides automatic schema drift protection by freezing the DataFrame structure on first validation.
These defaults are ideal for development, exploration, and rapid prototyping. For production deployments with explicit validation rules, consider using CustomExpectationSuiteStrategy and pre-defined suites.
Strategy Immutability:
Once a validator is instantiated, its strategies cannot be changed. To use different strategies, create a new validator instance. This design ensures consistent validation behavior throughout a validator's lifetime.
Data Context Lifecycle:
The validator does not own the data context lifecycle. The caller is responsible for creating and properly disposing of the data context. For ephemeral contexts used in testing, ensure proper cleanup:
>>> context = EphemeralDataContext()
>>> try:
... validator = GXValidator(data_context=context)
... # Use validator
... finally:
... # Clean up context if needed
... pass
Examples
Create a validator with default auto-creation strategies:
>>> from great_expectations.data_context import EphemeralDataContext
>>> context = EphemeralDataContext()
>>> validator = GXValidator(data_context=context)
>>> # Automatically creates suites and freezes schemas
Create a validator with strict, manual suite management:
>>> from adc_toolkit.data.validators.gx.batch_managers import (
... CustomExpectationSuiteStrategy,
... SkipExpectationAddition,
... )
>>> validator = GXValidator(
... data_context=context,
... expectation_suite_lookup_strategy=CustomExpectationSuiteStrategy(),
... expectation_addition_strategy=SkipExpectationAddition(),
... )
>>> # Requires pre-defined suites, no automatic expectations
Create a validator with auto-creation but manual expectations:
>>> validator = GXValidator(
... data_context=context,
... expectation_suite_lookup_strategy=AutoExpectationSuiteCreation(),
... expectation_addition_strategy=SkipExpectationAddition(),
... )
>>> # Creates suites automatically but expects manual expectation definition
Use with a filesystem-based data context:
>>> from adc_toolkit.data.validators.gx.data_context import RepoDataContext
>>> context = RepoDataContext("/path/to/gx").create()
>>> validator = GXValidator(data_context=context)
Use with a cloud-based data context:
>>> from adc_toolkit.data.validators.gx.data_context import S3DataContext
>>> context = S3DataContext("s3://my-bucket/gx-config").create()
>>> validator = GXValidator(data_context=context)
418 @classmethod 419 def in_directory(cls, path: str | Path) -> "GXValidator": 420 """ 421 Create a GXValidator with a filesystem-based Great Expectations data context. 422 423 This factory method provides a convenient way to create a validator using 424 a repository-based (filesystem) data context. It initializes a RepoDataContext 425 from the specified directory and creates a validator with default strategies 426 for auto-creation and schema freezing. 427 428 The specified directory should contain a Great Expectations project structure 429 with configuration files, expectation suites, checkpoints, and validation 430 results. If the directory does not contain a valid GX project, the 431 RepoDataContext will initialize a new project structure. 432 433 Parameters 434 ---------- 435 path : str or pathlib.Path 436 Path to the directory containing Great Expectations configuration. 437 This directory should have (or will be initialized with) the 438 following structure: 439 440 - great_expectations.yml : Main configuration file 441 - expectations/ : Directory containing expectation suite JSON files 442 - checkpoints/ : Directory containing checkpoint YAML files 443 - uncommitted/ : Directory for validation results and data docs 444 - plugins/ : Optional directory for custom expectations 445 446 If the directory does not exist or is empty, a new GX project 447 structure will be created. Both absolute and relative paths are 448 supported. 449 450 Returns 451 ------- 452 GXValidator 453 A new GXValidator instance configured with: 454 455 - RepoDataContext based on the specified directory 456 - AutoExpectationSuiteCreation strategy (creates suites automatically) 457 - SchemaExpectationAddition strategy (freezes schemas automatically) 458 459 Raises 460 ------ 461 FileNotFoundError 462 If the parent directory of the specified path does not exist 463 and cannot be created. 464 PermissionError 465 If the process lacks permissions to read from or write to the 466 specified directory. 467 ValueError 468 If the directory contains invalid Great Expectations configuration 469 files that cannot be parsed. 470 471 See Also 472 -------- 473 __init__ : Constructor for custom data context and strategy configuration. 474 validate : Validate data using this validator. 475 adc_toolkit.data.validators.gx.data_context.RepoDataContext : Filesystem context implementation. 476 adc_toolkit.data.validators.gx.data_context.S3DataContext : AWS S3-based context. 477 adc_toolkit.data.validators.gx.data_context.GCPDataContext : Google Cloud context. 478 479 Notes 480 ----- 481 **Repository Structure:** 482 483 Great Expectations uses a specific directory structure to organize 484 validation artifacts: 485 486 - Expectation suites are stored as JSON in expectations/ 487 - Checkpoints are stored as YAML in checkpoints/ 488 - Validation results go in uncommitted/validations/ 489 - Data docs are generated in uncommitted/data_docs/ 490 491 This structure enables version control of validation rules while keeping 492 validation results and documentation out of version control. 493 494 **Version Control:** 495 496 When using filesystem-based contexts, consider the following for version 497 control (Git): 498 499 - Commit: expectations/, checkpoints/, great_expectations.yml, plugins/ 500 - Ignore: uncommitted/ (contains validation results and generated docs) 501 502 This approach version controls validation rules while excluding 503 environment-specific results. 504 505 **Performance:** 506 507 The in_directory method performs I/O operations to read configuration 508 and initialize the data context. For applications creating many validator 509 instances, consider caching the data context and passing it to __init__ 510 instead of using in_directory repeatedly. 511 512 **Automatic Initialization:** 513 514 If the specified directory does not contain a great_expectations.yml file, 515 RepoDataContext will initialize a new GX project. This is useful for 516 quickly starting validation without manual GX project setup, but may not 517 be suitable for production deployments where explicit configuration is 518 preferred. 519 520 **Default Strategies:** 521 522 This factory method always uses default strategies (AutoExpectationSuiteCreation 523 and SchemaExpectationAddition). For custom strategies, use the __init__ 524 constructor directly: 525 526 >>> from adc_toolkit.data.validators.gx.data_context import RepoDataContext 527 >>> context = RepoDataContext(path).create() 528 >>> validator = GXValidator(data_context=context, expectation_suite_lookup_strategy=CustomStrategy()) 529 530 Examples 531 -------- 532 Create a validator from a GX project directory: 533 534 >>> validator = GXValidator.in_directory("/path/to/gx_project") 535 >>> import pandas as pd 536 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 537 >>> validated = validator.validate("my_dataset", df) 538 539 Use with a relative path: 540 541 >>> validator = GXValidator.in_directory("config/validations") 542 >>> validated = validator.validate("dataset", data) 543 544 Use with pathlib.Path: 545 546 >>> from pathlib import Path 547 >>> config_path = Path("config") / "gx" 548 >>> validator = GXValidator.in_directory(config_path) 549 550 Initialize a new GX project and validator: 551 552 >>> # Directory doesn't exist yet 553 >>> validator = GXValidator.in_directory("./new_gx_project") 554 >>> # Now directory contains initialized GX project structure 555 556 Validate multiple datasets with one validator: 557 558 >>> validator = GXValidator.in_directory("config/gx") 559 >>> df1 = pd.DataFrame({"a": [1, 2]}) 560 >>> df2 = pd.DataFrame({"b": ["x", "y"]}) 561 >>> validated1 = validator.validate("dataset1", df1) 562 >>> validated2 = validator.validate("dataset2", df2) 563 564 Integration in a data pipeline: 565 566 >>> def validate_pipeline_data(data_path: str, gx_path: str) -> None: 567 ... validator = GXValidator.in_directory(gx_path) 568 ... for dataset_name in ["raw", "cleaned", "features"]: 569 ... df = pd.read_csv(f"{data_path}/{dataset_name}.csv") 570 ... validated = validator.validate(dataset_name, df) 571 ... print(f"Validated {dataset_name}: {len(validated)} rows") 572 """ 573 return cls(data_context=RepoDataContext(path).create())
Create a GXValidator with a filesystem-based Great Expectations data context.
This factory method provides a convenient way to create a validator using a repository-based (filesystem) data context. It initializes a RepoDataContext from the specified directory and creates a validator with default strategies for auto-creation and schema freezing.
The specified directory should contain a Great Expectations project structure with configuration files, expectation suites, checkpoints, and validation results. If the directory does not contain a valid GX project, the RepoDataContext will initialize a new project structure.
Parameters
path (str or pathlib.Path): Path to the directory containing Great Expectations configuration. This directory should have (or will be initialized with) the following structure:
- great_expectations.yml : Main configuration file
- expectations/ : Directory containing expectation suite JSON files
- checkpoints/ : Directory containing checkpoint YAML files
- uncommitted/ : Directory for validation results and data docs
- plugins/ : Optional directory for custom expectations
If the directory does not exist or is empty, a new GX project structure will be created. Both absolute and relative paths are supported.
Returns
GXValidator: A new GXValidator instance configured with:
- RepoDataContext based on the specified directory
- AutoExpectationSuiteCreation strategy (creates suites automatically)
- SchemaExpectationAddition strategy (freezes schemas automatically)
Raises
- FileNotFoundError: If the parent directory of the specified path does not exist and cannot be created.
- PermissionError: If the process lacks permissions to read from or write to the specified directory.
- ValueError: If the directory contains invalid Great Expectations configuration files that cannot be parsed.
See Also
__init__: Constructor for custom data context and strategy configuration.
validate: Validate data using this validator.
adc_toolkit.data.validators.gx.data_context.RepoDataContext: Filesystem context implementation.
adc_toolkit.data.validators.gx.data_context.S3DataContext: AWS S3-based context.
adc_toolkit.data.validators.gx.data_context.GCPDataContext: Google Cloud context.
Notes
Repository Structure:
Great Expectations uses a specific directory structure to organize validation artifacts:
- Expectation suites are stored as JSON in expectations/
- Checkpoints are stored as YAML in checkpoints/
- Validation results go in uncommitted/validations/
- Data docs are generated in uncommitted/data_docs/
This structure enables version control of validation rules while keeping validation results and documentation out of version control.
Version Control:
When using filesystem-based contexts, consider the following for version control (Git):
- Commit: expectations/, checkpoints/, great_expectations.yml, plugins/
- Ignore: uncommitted/ (contains validation results and generated docs)
This approach version controls validation rules while excluding environment-specific results.
Performance:
The in_directory method performs I/O operations to read configuration and initialize the data context. For applications creating many validator instances, consider caching the data context and passing it to __init__ instead of using in_directory repeatedly.
Automatic Initialization:
If the specified directory does not contain a great_expectations.yml file, RepoDataContext will initialize a new GX project. This is useful for quickly starting validation without manual GX project setup, but may not be suitable for production deployments where explicit configuration is preferred.
Default Strategies:
This factory method always uses default strategies (AutoExpectationSuiteCreation and SchemaExpectationAddition). For custom strategies, use the __init__ constructor directly:
>>> from adc_toolkit.data.validators.gx.data_context import RepoDataContext
>>> context = RepoDataContext(path).create()
>>> validator = GXValidator(data_context=context, expectation_suite_lookup_strategy=CustomStrategy())
Examples
Create a validator from a GX project directory:
>>> validator = GXValidator.in_directory("/path/to/gx_project")
>>> import pandas as pd
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
>>> validated = validator.validate("my_dataset", df)
Use with a relative path:
>>> validator = GXValidator.in_directory("config/validations")
>>> validated = validator.validate("dataset", data)
Use with pathlib.Path:
>>> from pathlib import Path
>>> config_path = Path("config") / "gx"
>>> validator = GXValidator.in_directory(config_path)
Initialize a new GX project and validator:
>>> # Directory doesn't exist yet
>>> validator = GXValidator.in_directory("./new_gx_project")
>>> # Now directory contains initialized GX project structure
Validate multiple datasets with one validator:
>>> validator = GXValidator.in_directory("config/gx")
>>> df1 = pd.DataFrame({"a": [1, 2]})
>>> df2 = pd.DataFrame({"b": ["x", "y"]})
>>> validated1 = validator.validate("dataset1", df1)
>>> validated2 = validator.validate("dataset2", df2)
Integration in a data pipeline:
>>> def validate_pipeline_data(data_path: str, gx_path: str) -> None:
... validator = GXValidator.in_directory(gx_path)
... for dataset_name in ["raw", "cleaned", "features"]:
... df = pd.read_csv(f"{data_path}/{dataset_name}.csv")
... validated = validator.validate(dataset_name, df)
... print(f"Validated {dataset_name}: {len(validated)} rows")
575 def validate(self, name: str, data: Data) -> Data: 576 """ 577 Validate data against Great Expectations rules for the named dataset. 578 579 Executes the complete Great Expectations validation workflow for the 580 specified dataset. This includes expectation suite lookup or creation, 581 batch request generation, expectation addition, checkpoint creation and 582 execution, and validation result evaluation. 583 584 The validation process ensures data quality by verifying that the data 585 meets all expectations defined in the associated expectation suite. If 586 validation fails, detailed error information identifies which expectations 587 failed and why. 588 589 On successful validation, the original data is returned unchanged. The 590 validation is side-effect free from the data perspective, but may create 591 or update expectation suites, checkpoints, and validation results in the 592 data context storage. 593 594 Parameters 595 ---------- 596 name : str 597 Identifier for the dataset being validated. This name is used to: 598 599 - Look up the corresponding expectation suite (named "{name}_suite") 600 - Create or update the checkpoint for this dataset 601 - Store validation results associated with this dataset 602 603 The name should be consistent across validation calls for the same 604 logical dataset to ensure proper suite reuse and result tracking. 605 Use descriptive, stable names like "customer_data", "sales_features", 606 or "model_predictions". 607 608 data : Data 609 The dataset to validate. Must be a Data protocol-compatible object, 610 typically a pandas DataFrame or Spark DataFrame. The data should 611 have `columns` and `dtypes` properties for schema inspection. 612 613 The data is not modified by validation. If validation succeeds, 614 the same object (or an equivalent copy) is returned. 615 616 Returns 617 ------- 618 Data 619 The validated data. This is the same object as the input `data` 620 parameter if validation succeeds. The return type matches the input 621 type (e.g., pandas.DataFrame returns pandas.DataFrame). 622 623 Returning the data enables method chaining and integration with 624 pipelines: 625 626 >>> validated = validator.validate("data", raw_data) 627 >>> processed = transform(validated) 628 629 Raises 630 ------ 631 ValidationError 632 If the data fails validation against the expectation suite. The 633 exception contains detailed information about: 634 635 - Which expectations failed 636 - Observed values that violated expectations 637 - Expected values or constraints 638 - Summary statistics for failed validations 639 640 This exception indicates data quality issues that must be addressed 641 before proceeding with downstream processing. 642 643 ExpectationSuiteNotFoundError 644 If the expectation suite for the dataset does not exist and the 645 validator is configured with CustomExpectationSuiteStrategy. This 646 indicates that validation rules must be explicitly defined before 647 validation can proceed. 648 649 To resolve, either: 650 - Create the expectation suite manually in the data context 651 - Switch to AutoExpectationSuiteCreation strategy 652 - Ensure the correct data context is being used 653 654 TypeError 655 If the data type is incompatible with Great Expectations batch 656 creation. For example, if the data does not have the required 657 `columns` and `dtypes` attributes. 658 659 KeyError 660 If the batch manager cannot create a batch from the data due to 661 missing required attributes or metadata. 662 663 See Also 664 -------- 665 __init__ : Constructor for configuring validation strategies. 666 in_directory : Factory method for filesystem-based validators. 667 adc_toolkit.data.validators.gx.batch_managers.validate_dataset : Underlying validation function. 668 adc_toolkit.data.abs.DataValidator.validate : Protocol method specification. 669 670 Notes 671 ----- 672 **Validation Workflow:** 673 674 The validate method orchestrates these steps: 675 676 1. **Suite Lookup**: Check if an expectation suite exists for the dataset. 677 If not, behavior depends on the lookup strategy: 678 679 - AutoExpectationSuiteCreation: Create a new suite 680 - CustomExpectationSuiteStrategy: Raise ExpectationSuiteNotFoundError 681 682 2. **Batch Creation**: Convert the data into a GX Batch object using 683 BatchManager, making it compatible with GX validation operations. 684 685 3. **Expectation Addition**: Add expectations to the suite based on the 686 addition strategy: 687 688 - SchemaExpectationAddition: Inspect data schema and add schema expectations 689 - SkipExpectationAddition: Skip, expecting manual expectation definition 690 691 4. **Checkpoint Execution**: Create or update a checkpoint for the dataset 692 and execute it to validate the batch against the expectation suite. 693 694 5. **Result Evaluation**: Analyze validation results. If all expectations 695 pass, return the data. If any fail, raise ValidationError with details. 696 697 **First Validation vs. Subsequent Validations:** 698 699 The first time a dataset is validated (with AutoExpectationSuiteCreation 700 and SchemaExpectationAddition), the validator: 701 702 - Creates an expectation suite named "{name}_suite" 703 - Inspects the DataFrame schema (column names and types) 704 - Adds schema expectations that "freeze" this structure 705 - Creates a checkpoint for the dataset 706 - Validates the data (which should pass since expectations match the data) 707 708 Subsequent validations of the same dataset: 709 710 - Reuse the existing expectation suite and checkpoint 711 - Validate data against the frozen schema and any other expectations 712 - Detect schema drift or data quality issues 713 714 **Performance:** 715 716 Validation performance depends on several factors: 717 718 - Number of expectations in the suite 719 - Complexity of expectations (simple schema checks vs. statistical tests) 720 - Size of the dataset (some expectations scan all data) 721 - Data context backend (filesystem vs. cloud storage) 722 723 First validation is slower due to suite and checkpoint creation overhead. 724 Subsequent validations are faster, typically scaling with the number of 725 expectations rather than data size. 726 727 For large datasets with expensive expectations, consider: 728 - Sampling strategies to validate subsets 729 - Caching validation results 730 - Running validations asynchronously 731 - Using incremental validation for streaming data 732 733 **Idempotency:** 734 735 Validation is idempotent: validating the same data multiple times with 736 the same name produces the same result (pass or fail). However, validation 737 results are stored with timestamps, so each validation creates new result 738 artifacts in the data context. 739 740 **Thread Safety:** 741 742 The validate method is not thread-safe. Multiple threads validating 743 different datasets concurrently may encounter race conditions when 744 accessing the data context. For concurrent validation, create separate 745 validator instances (with separate data contexts) per thread. 746 747 **Side Effects:** 748 749 While validation does not modify the data, it may have side effects: 750 751 - Create or update expectation suites in the data context 752 - Create or update checkpoints in the data context 753 - Write validation results to storage (filesystem or cloud) 754 - Generate data documentation if configured 755 756 These artifacts are stored according to the data context configuration. 757 758 Examples 759 -------- 760 Basic validation with automatic suite creation: 761 762 >>> import pandas as pd 763 >>> from adc_toolkit.data.validators.gx import GXValidator 764 >>> validator = GXValidator.in_directory("config/gx") 765 >>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]}) 766 >>> validated = validator.validate("sales_data", df) 767 >>> # First validation creates suite and freezes schema 768 >>> validated.shape 769 (3, 2) 770 771 Subsequent validation detects schema drift: 772 773 >>> df_valid = pd.DataFrame({"id": [4, 5], "value": [40, 50]}) 774 >>> validator.validate("sales_data", df_valid) # Passes, schema matches 775 >>> df_invalid = pd.DataFrame({"id": [6], "price": [100]}) 776 >>> validator.validate("sales_data", df_invalid) # Raises ValidationError 777 778 Handle validation failures gracefully: 779 780 >>> try: 781 ... validated = validator.validate("strict_data", df) 782 ... except ValidationError as e: 783 ... print(f"Validation failed: {e}") 784 ... # Log error, send alert, reject data, etc. 785 ... raise 786 787 Validate multiple datasets in a pipeline: 788 789 >>> def etl_pipeline(validator: GXValidator) -> None: 790 ... raw = load_raw_data() 791 ... validated_raw = validator.validate("raw_data", raw) 792 ... cleaned = clean(validated_raw) 793 ... validated_clean = validator.validate("cleaned_data", cleaned) 794 ... features = engineer_features(validated_clean) 795 ... validated_features = validator.validate("features", features) 796 ... save(validated_features) 797 798 Use validation in data loading: 799 800 >>> class ValidatedDataLoader: 801 ... def __init__(self, validator: GXValidator): 802 ... self.validator = validator 803 ... 804 ... def load(self, name: str, path: str) -> pd.DataFrame: 805 ... df = pd.read_csv(path) 806 ... return self.validator.validate(name, df) 807 808 Integration with ValidatedDataCatalog: 809 810 >>> from adc_toolkit.data.catalog import ValidatedDataCatalog 811 >>> catalog = ValidatedDataCatalog.in_directory( 812 ... "config/catalog", validator=GXValidator.in_directory("config/gx") 813 ... ) 814 >>> # Validation happens automatically on load 815 >>> df = catalog.load("customer_data") # Validates after loading 816 >>> processed = transform(df) 817 >>> catalog.save("processed_data", processed) # Validates before saving 818 819 Validate with custom expectation suite: 820 821 >>> # Pre-create suite with custom expectations 822 >>> suite = context.create_expectation_suite("custom_data_suite") 823 >>> suite.add_expectation( 824 ... ExpectationConfiguration( 825 ... expectation_type="expect_column_values_to_be_in_range", 826 ... kwargs={"column": "age", "min_value": 0, "max_value": 120}, 827 ... ) 828 ... ) 829 >>> # Now validate using the custom suite 830 >>> df = pd.DataFrame({"age": [25, 30, 35]}) 831 >>> validator.validate("custom_data", df) # Uses custom_data_suite 832 """ 833 return validate_dataset( 834 name, 835 data, 836 self.data_context, 837 self.expectation_suite_lookup_strategy, 838 self.expectation_addition_strategy, 839 )
Validate data against Great Expectations rules for the named dataset.
Executes the complete Great Expectations validation workflow for the specified dataset. This includes expectation suite lookup or creation, batch request generation, expectation addition, checkpoint creation and execution, and validation result evaluation.
The validation process ensures data quality by verifying that the data meets all expectations defined in the associated expectation suite. If validation fails, detailed error information identifies which expectations failed and why.
On successful validation, the original data is returned unchanged. The validation is side-effect free from the data perspective, but may create or update expectation suites, checkpoints, and validation results in the data context storage.
Parameters
name (str): Identifier for the dataset being validated. This name is used to:
- Look up the corresponding expectation suite (named "{name}_suite")
- Create or update the checkpoint for this dataset
- Store validation results associated with this dataset
The name should be consistent across validation calls for the same logical dataset to ensure proper suite reuse and result tracking. Use descriptive, stable names like "customer_data", "sales_features", or "model_predictions".
data (Data): The dataset to validate. Must be a Data protocol-compatible object, typically a pandas DataFrame or Spark DataFrame. The data should have
columnsanddtypesproperties for schema inspection.The data is not modified by validation. If validation succeeds, the same object (or an equivalent copy) is returned.
Returns
- Data: The validated data. This is the same object as the input
dataparameter if validation succeeds. The return type matches the input type (e.g., pandas.DataFrame returns pandas.DataFrame).
Returning the data enables method chaining and integration with pipelines:
>>> validated = validator.validate("data", raw_data)
>>> processed = transform(validated)
Raises
ValidationError: If the data fails validation against the expectation suite. The exception contains detailed information about:
- Which expectations failed
- Observed values that violated expectations
- Expected values or constraints
- Summary statistics for failed validations
This exception indicates data quality issues that must be addressed before proceeding with downstream processing.
- ExpectationSuiteNotFoundError: If the expectation suite for the dataset does not exist and the validator is configured with CustomExpectationSuiteStrategy. This indicates that validation rules must be explicitly defined before validation can proceed.
To resolve, either:
- Create the expectation suite manually in the data context
- Switch to AutoExpectationSuiteCreation strategy
- Ensure the correct data context is being used
- TypeError: If the data type is incompatible with Great Expectations batch
creation. For example, if the data does not have the required
columnsanddtypesattributes. - KeyError: If the batch manager cannot create a batch from the data due to missing required attributes or metadata.
- TypeError: If the data type is incompatible with Great Expectations batch
creation. For example, if the data does not have the required
See Also
__init__: Constructor for configuring validation strategies.
in_directory: Factory method for filesystem-based validators.
adc_toolkit.data.validators.gx.batch_managers.validate_dataset: Underlying validation function.
adc_toolkit.data.abs.DataValidator.validate: Protocol method specification.
Notes
Validation Workflow:
The validate method orchestrates these steps:
Suite Lookup: Check if an expectation suite exists for the dataset. If not, behavior depends on the lookup strategy:
- AutoExpectationSuiteCreation: Create a new suite
- CustomExpectationSuiteStrategy: Raise ExpectationSuiteNotFoundError
Batch Creation: Convert the data into a GX Batch object using BatchManager, making it compatible with GX validation operations.
Expectation Addition: Add expectations to the suite based on the addition strategy:
- SchemaExpectationAddition: Inspect data schema and add schema expectations
- SkipExpectationAddition: Skip, expecting manual expectation definition
Checkpoint Execution: Create or update a checkpoint for the dataset and execute it to validate the batch against the expectation suite.
Result Evaluation: Analyze validation results. If all expectations pass, return the data. If any fail, raise ValidationError with details.
First Validation vs. Subsequent Validations:
The first time a dataset is validated (with AutoExpectationSuiteCreation and SchemaExpectationAddition), the validator:
- Creates an expectation suite named "{name}_suite"
- Inspects the DataFrame schema (column names and types)
- Adds schema expectations that "freeze" this structure
- Creates a checkpoint for the dataset
- Validates the data (which should pass since expectations match the data)
Subsequent validations of the same dataset:
- Reuse the existing expectation suite and checkpoint
- Validate data against the frozen schema and any other expectations
- Detect schema drift or data quality issues
Performance:
Validation performance depends on several factors:
- Number of expectations in the suite
- Complexity of expectations (simple schema checks vs. statistical tests)
- Size of the dataset (some expectations scan all data)
- Data context backend (filesystem vs. cloud storage)
First validation is slower due to suite and checkpoint creation overhead. Subsequent validations are faster, typically scaling with the number of expectations rather than data size.
For large datasets with expensive expectations, consider:
- Sampling strategies to validate subsets
- Caching validation results
- Running validations asynchronously
- Using incremental validation for streaming data
Idempotency:
Validation is idempotent: validating the same data multiple times with the same name produces the same result (pass or fail). However, validation results are stored with timestamps, so each validation creates new result artifacts in the data context.
Thread Safety:
The validate method is not thread-safe. Multiple threads validating different datasets concurrently may encounter race conditions when accessing the data context. For concurrent validation, create separate validator instances (with separate data contexts) per thread.
Side Effects:
While validation does not modify the data, it may have side effects:
- Create or update expectation suites in the data context
- Create or update checkpoints in the data context
- Write validation results to storage (filesystem or cloud)
- Generate data documentation if configured
These artifacts are stored according to the data context configuration.
Examples
Basic validation with automatic suite creation:
>>> import pandas as pd
>>> from adc_toolkit.data.validators.gx import GXValidator
>>> validator = GXValidator.in_directory("config/gx")
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})
>>> validated = validator.validate("sales_data", df)
>>> # First validation creates suite and freezes schema
>>> validated.shape
(3, 2)
Subsequent validation detects schema drift:
>>> df_valid = pd.DataFrame({"id": [4, 5], "value": [40, 50]})
>>> validator.validate("sales_data", df_valid) # Passes, schema matches
>>> df_invalid = pd.DataFrame({"id": [6], "price": [100]})
>>> validator.validate("sales_data", df_invalid) # Raises ValidationError
Handle validation failures gracefully:
>>> try:
... validated = validator.validate("strict_data", df)
... except ValidationError as e:
... print(f"Validation failed: {e}")
... # Log error, send alert, reject data, etc.
... raise
Validate multiple datasets in a pipeline:
>>> def etl_pipeline(validator: GXValidator) -> None:
... raw = load_raw_data()
... validated_raw = validator.validate("raw_data", raw)
... cleaned = clean(validated_raw)
... validated_clean = validator.validate("cleaned_data", cleaned)
... features = engineer_features(validated_clean)
... validated_features = validator.validate("features", features)
... save(validated_features)
Use validation in data loading:
>>> class ValidatedDataLoader:
... def __init__(self, validator: GXValidator):
... self.validator = validator
...
... def load(self, name: str, path: str) -> pd.DataFrame:
... df = pd.read_csv(path)
... return self.validator.validate(name, df)
Integration with ValidatedDataCatalog:
>>> from adc_toolkit.data.catalog import ValidatedDataCatalog
>>> catalog = ValidatedDataCatalog.in_directory(
... "config/catalog", validator=GXValidator.in_directory("config/gx")
... )
>>> # Validation happens automatically on load
>>> df = catalog.load("customer_data") # Validates after loading
>>> processed = transform(df)
>>> catalog.save("processed_data", processed) # Validates before saving
Validate with custom expectation suite:
>>> # Pre-create suite with custom expectations
>>> suite = context.create_expectation_suite("custom_data_suite")
>>> suite.add_expectation(
... ExpectationConfiguration(
... expectation_type="expect_column_values_to_be_in_range",
... kwargs={"column": "age", "min_value": 0, "max_value": 120},
... )
... )
>>> # Now validate using the custom suite
>>> df = pd.DataFrame({"age": [25, 30, 35]})
>>> validator.validate("custom_data", df) # Uses custom_data_suite
64class ValidatorBasedExpectationAddition: 65 r""" 66 Add expectations to GX expectation suites using Validator objects. 67 68 This class implements the ExpectationAddition protocol by leveraging Great 69 Expectations' Validator API to add expectations programmatically. Rather than 70 creating ExpectationConfiguration objects, this approach calls expectation 71 methods directly on a Validator object, which then automatically adds them 72 to the associated expectation suite. 73 74 The validator-based approach offers several advantages: 75 76 1. **Natural API**: Expectations are added using method calls that match GX's 77 standard expectation API (e.g., ``expect_column_values_to_not_be_null``). 78 2. **Immediate Validation**: Each expectation can be validated against the 79 batch data as it's added. 80 3. **Type Safety**: IDEs can provide autocomplete and type hints for expectation 81 parameters. 82 4. **Automatic Suite Management**: The validator automatically handles saving 83 expectations to the suite after each addition. 84 85 This class is stateless and can be reused across multiple batch managers and 86 expectation addition operations. 87 88 Attributes 89 ---------- 90 None 91 This class maintains no internal state. 92 93 Methods 94 ------- 95 add_expectations(batch_manager, expectations) 96 Add a list of expectations to the batch manager's expectation suite using 97 a GX Validator object. 98 99 See Also 100 -------- 101 ConfigurationBasedExpectationAddition : Alternative implementation using ExpectationConfiguration 102 ExpectationAddition : Protocol defining the expectation addition interface 103 create_batch_validator : Factory function for creating GX Validator objects 104 105 Notes 106 ----- 107 The validator-based approach creates a new Validator object for each call to 108 ``add_expectations``. The Validator object is retrieved from the data context 109 using the batch manager's batch request and expectation suite name. Each 110 expectation is added by calling the corresponding method on the validator 111 (e.g., ``validator.expect_column_to_exist(column="col1")``), and the suite 112 is saved after each expectation addition. 113 114 This approach differs from configuration-based addition in that it uses the 115 GX Validator API rather than directly manipulating ExpectationConfiguration 116 objects. The validator-based approach may be slower for adding many expectations 117 at once due to the suite save operation after each expectation, but it provides 118 better feedback and is more suitable for interactive workflows. 119 120 Performance considerations: 121 122 - Each expectation triggers a save operation on the expectation suite 123 - For bulk expectation addition, consider batching or using configuration-based 124 approach if performance is critical 125 - The validator creation overhead is incurred once per ``add_expectations`` call 126 127 Examples 128 -------- 129 Basic usage with a batch manager: 130 131 >>> from adc_toolkit.data.validators.gx.batch_managers import BatchManager 132 >>> from adc_toolkit.data.validators.gx.batch_managers.expectation_addition import ValidatorBasedExpectationAddition 133 >>> adder = ValidatorBasedExpectationAddition() 134 >>> expectations = [ 135 ... {"expect_column_to_exist": {"column": "user_id"}}, 136 ... {"expect_column_values_to_be_unique": {"column": "user_id"}}, 137 ... ] 138 >>> adder.add_expectations(batch_manager, expectations) 139 140 Adding multiple expectations with different types: 141 142 >>> expectations = [ 143 ... { 144 ... "expect_column_values_to_be_in_set": { 145 ... "column": "status", 146 ... "value_set": ["active", "inactive"], 147 ... } 148 ... }, 149 ... { 150 ... "expect_column_values_to_be_between": { 151 ... "column": "age", 152 ... "min_value": 18, 153 ... "max_value": 100, 154 ... } 155 ... }, 156 ... { 157 ... "expect_column_values_to_match_regex": { 158 ... "column": "email", 159 ... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", 160 ... } 161 ... }, 162 ... ] 163 >>> adder.add_expectations(batch_manager, expectations) 164 165 Using with complex expectation parameters: 166 167 >>> expectations = [ 168 ... { 169 ... "expect_column_pair_values_A_to_be_greater_than_B": { 170 ... "column_A": "end_date", 171 ... "column_B": "start_date", 172 ... "or_equal": True, 173 ... } 174 ... }, 175 ... ] 176 >>> adder.add_expectations(batch_manager, expectations) 177 """ 178 179 def add_expectations(self, batch_manager: BatchManager, expectations: list[dict]) -> None: 180 r""" 181 Add expectations to the suite using a GX Validator object. 182 183 This method creates a GX Validator object from the batch manager and uses 184 it to add expectations programmatically. Each expectation dictionary is 185 parsed to extract the expectation type (method name) and its parameters, 186 then the corresponding method is called on the validator. After each 187 expectation is added, the expectation suite is automatically saved. 188 189 The method processes expectations sequentially, calling the appropriate 190 expectation method on the validator for each one. The validator handles 191 the creation of ExpectationConfiguration objects internally and adds them 192 to the suite. 193 194 Parameters 195 ---------- 196 batch_manager : BatchManager 197 The batch manager containing the data context, batch request, and 198 dataset name. This is used to create the validator and identify the 199 target expectation suite. 200 expectations : list of dict 201 A list of expectation dictionaries to add to the suite. Each dictionary 202 must contain exactly one key-value pair, where: 203 204 - The key is the expectation method name (e.g., 205 ``"expect_column_values_to_be_in_set"``) 206 - The value is a dictionary of keyword arguments to pass to that 207 expectation method (e.g., ``{"column": "col1", "value_set": [1, 2, 3]}``) 208 209 The expectation method names should match GX's standard expectation API. 210 211 Returns 212 ------- 213 None 214 This method modifies the expectation suite in place through the 215 validator and does not return a value. 216 217 Raises 218 ------ 219 InvalidExpectationDictionaryError 220 If any expectation dictionary does not contain exactly one key-value pair. 221 InvalidExpectationNameTypeError 222 If any expectation method name is not a string. 223 InvalidExpectationKwargsTypeError 224 If the parameters for any expectation are not provided as a dictionary. 225 AttributeError 226 If the expectation method name does not exist on the Validator object 227 (i.e., it's not a valid GX expectation). 228 TypeError 229 If the expectation parameters don't match the expected signature for 230 that expectation method. 231 GreatExpectationsError 232 If the data context or expectation suite cannot be accessed, or if 233 there are issues saving the expectation suite. 234 235 See Also 236 -------- 237 create_batch_validator : Creates the GX Validator object used by this method 238 parse_expectations_dict : Parses expectation dictionaries into method names and kwargs 239 ConfigurationBasedExpectationAddition : Alternative approach using ExpectationConfiguration 240 241 Notes 242 ----- 243 This method performs the following operations for each expectation: 244 245 1. Parse the expectation dictionary to extract the method name and parameters 246 2. Call ``getattr(validator, expectation_name)(**kwargs)`` to add the expectation 247 3. Call ``validator.save_expectation_suite()`` to persist the change 248 249 The validator is created once at the beginning of the method and reused for 250 all expectations in the list. Each expectation triggers an individual save 251 operation, which ensures that expectations are persisted even if a later 252 expectation fails, but may impact performance when adding many expectations. 253 254 The validator-based approach provides immediate validation feedback and can 255 help catch configuration errors early, as the expectation methods perform 256 parameter validation when called. 257 258 Performance characteristics: 259 260 - Time complexity: O(n) where n is the number of expectations 261 - Each expectation triggers a suite save operation 262 - Validator creation overhead is amortized across all expectations 263 264 Examples 265 -------- 266 Add basic column existence and uniqueness expectations: 267 268 >>> adder = ValidatorBasedExpectationAddition() 269 >>> adder.add_expectations( 270 ... batch_manager, 271 ... expectations=[ 272 ... {"expect_column_to_exist": {"column": "user_id"}}, 273 ... {"expect_column_values_to_be_unique": {"column": "user_id"}}, 274 ... ], 275 ... ) 276 277 Add expectations with value constraints: 278 279 >>> adder.add_expectations( 280 ... batch_manager, 281 ... expectations=[ 282 ... { 283 ... "expect_column_values_to_be_in_set": { 284 ... "column": "status", 285 ... "value_set": ["active", "inactive", "pending"], 286 ... } 287 ... }, 288 ... { 289 ... "expect_column_values_to_be_between": { 290 ... "column": "age", 291 ... "min_value": 0, 292 ... "max_value": 150, 293 ... } 294 ... }, 295 ... ], 296 ... ) 297 298 Add regex and pattern-based expectations: 299 300 >>> adder.add_expectations( 301 ... batch_manager, 302 ... expectations=[ 303 ... { 304 ... "expect_column_values_to_match_regex": { 305 ... "column": "email", 306 ... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", 307 ... } 308 ... }, 309 ... {"expect_column_values_to_not_be_null": {"column": "email"}}, 310 ... ], 311 ... ) 312 313 Add expectations with metadata: 314 315 >>> adder.add_expectations( 316 ... batch_manager, 317 ... expectations=[ 318 ... { 319 ... "expect_column_mean_to_be_between": { 320 ... "column": "revenue", 321 ... "min_value": 1000, 322 ... "max_value": 100000, 323 ... "meta": {"notes": "Revenue should be within normal business range"}, 324 ... } 325 ... }, 326 ... ], 327 ... ) 328 329 Add multi-column expectations: 330 331 >>> adder.add_expectations( 332 ... batch_manager, 333 ... expectations=[ 334 ... { 335 ... "expect_column_pair_values_A_to_be_greater_than_B": { 336 ... "column_A": "end_date", 337 ... "column_B": "start_date", 338 ... "or_equal": True, 339 ... } 340 ... }, 341 ... ], 342 ... ) 343 """ 344 validator = create_batch_validator(batch_manager) 345 for expectation in expectations: 346 expectation_name, kwargs = parse_expectations_dict(expectation_dictionary=expectation) 347 getattr(validator, expectation_name)(**kwargs) 348 validator.save_expectation_suite()
Add expectations to GX expectation suites using Validator objects.
This class implements the ExpectationAddition protocol by leveraging Great Expectations' Validator API to add expectations programmatically. Rather than creating ExpectationConfiguration objects, this approach calls expectation methods directly on a Validator object, which then automatically adds them to the associated expectation suite.
The validator-based approach offers several advantages:
- Natural API: Expectations are added using method calls that match GX's
standard expectation API (e.g.,
expect_column_values_to_not_be_null). - Immediate Validation: Each expectation can be validated against the batch data as it's added.
- Type Safety: IDEs can provide autocomplete and type hints for expectation parameters.
- Automatic Suite Management: The validator automatically handles saving expectations to the suite after each addition.
This class is stateless and can be reused across multiple batch managers and expectation addition operations.
Attributes
- None: This class maintains no internal state.
Methods
add_expectations(batch_manager, expectations) Add a list of expectations to the batch manager's expectation suite using a GX Validator object.
See Also
ConfigurationBasedExpectationAddition: Alternative implementation using ExpectationConfiguration
ExpectationAddition: Protocol defining the expectation addition interface
create_batch_validator: Factory function for creating GX Validator objects
Notes
The validator-based approach creates a new Validator object for each call to
add_expectations. The Validator object is retrieved from the data context
using the batch manager's batch request and expectation suite name. Each
expectation is added by calling the corresponding method on the validator
(e.g., validator.expect_column_to_exist(column="col1")), and the suite
is saved after each expectation addition.
This approach differs from configuration-based addition in that it uses the GX Validator API rather than directly manipulating ExpectationConfiguration objects. The validator-based approach may be slower for adding many expectations at once due to the suite save operation after each expectation, but it provides better feedback and is more suitable for interactive workflows.
Performance considerations:
- Each expectation triggers a save operation on the expectation suite
- For bulk expectation addition, consider batching or using configuration-based approach if performance is critical
- The validator creation overhead is incurred once per
add_expectationscall
Examples
Basic usage with a batch manager:
>>> from adc_toolkit.data.validators.gx.batch_managers import BatchManager
>>> from adc_toolkit.data.validators.gx.batch_managers.expectation_addition import ValidatorBasedExpectationAddition
>>> adder = ValidatorBasedExpectationAddition()
>>> expectations = [
... {"expect_column_to_exist": {"column": "user_id"}},
... {"expect_column_values_to_be_unique": {"column": "user_id"}},
... ]
>>> adder.add_expectations(batch_manager, expectations)
Adding multiple expectations with different types:
>>> expectations = [
... {
... "expect_column_values_to_be_in_set": {
... "column": "status",
... "value_set": ["active", "inactive"],
... }
... },
... {
... "expect_column_values_to_be_between": {
... "column": "age",
... "min_value": 18,
... "max_value": 100,
... }
... },
... {
... "expect_column_values_to_match_regex": {
... "column": "email",
... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
Using with complex expectation parameters:
>>> expectations = [
... {
... "expect_column_pair_values_A_to_be_greater_than_B": {
... "column_A": "end_date",
... "column_B": "start_date",
... "or_equal": True,
... }
... },
... ]
>>> adder.add_expectations(batch_manager, expectations)
179 def add_expectations(self, batch_manager: BatchManager, expectations: list[dict]) -> None: 180 r""" 181 Add expectations to the suite using a GX Validator object. 182 183 This method creates a GX Validator object from the batch manager and uses 184 it to add expectations programmatically. Each expectation dictionary is 185 parsed to extract the expectation type (method name) and its parameters, 186 then the corresponding method is called on the validator. After each 187 expectation is added, the expectation suite is automatically saved. 188 189 The method processes expectations sequentially, calling the appropriate 190 expectation method on the validator for each one. The validator handles 191 the creation of ExpectationConfiguration objects internally and adds them 192 to the suite. 193 194 Parameters 195 ---------- 196 batch_manager : BatchManager 197 The batch manager containing the data context, batch request, and 198 dataset name. This is used to create the validator and identify the 199 target expectation suite. 200 expectations : list of dict 201 A list of expectation dictionaries to add to the suite. Each dictionary 202 must contain exactly one key-value pair, where: 203 204 - The key is the expectation method name (e.g., 205 ``"expect_column_values_to_be_in_set"``) 206 - The value is a dictionary of keyword arguments to pass to that 207 expectation method (e.g., ``{"column": "col1", "value_set": [1, 2, 3]}``) 208 209 The expectation method names should match GX's standard expectation API. 210 211 Returns 212 ------- 213 None 214 This method modifies the expectation suite in place through the 215 validator and does not return a value. 216 217 Raises 218 ------ 219 InvalidExpectationDictionaryError 220 If any expectation dictionary does not contain exactly one key-value pair. 221 InvalidExpectationNameTypeError 222 If any expectation method name is not a string. 223 InvalidExpectationKwargsTypeError 224 If the parameters for any expectation are not provided as a dictionary. 225 AttributeError 226 If the expectation method name does not exist on the Validator object 227 (i.e., it's not a valid GX expectation). 228 TypeError 229 If the expectation parameters don't match the expected signature for 230 that expectation method. 231 GreatExpectationsError 232 If the data context or expectation suite cannot be accessed, or if 233 there are issues saving the expectation suite. 234 235 See Also 236 -------- 237 create_batch_validator : Creates the GX Validator object used by this method 238 parse_expectations_dict : Parses expectation dictionaries into method names and kwargs 239 ConfigurationBasedExpectationAddition : Alternative approach using ExpectationConfiguration 240 241 Notes 242 ----- 243 This method performs the following operations for each expectation: 244 245 1. Parse the expectation dictionary to extract the method name and parameters 246 2. Call ``getattr(validator, expectation_name)(**kwargs)`` to add the expectation 247 3. Call ``validator.save_expectation_suite()`` to persist the change 248 249 The validator is created once at the beginning of the method and reused for 250 all expectations in the list. Each expectation triggers an individual save 251 operation, which ensures that expectations are persisted even if a later 252 expectation fails, but may impact performance when adding many expectations. 253 254 The validator-based approach provides immediate validation feedback and can 255 help catch configuration errors early, as the expectation methods perform 256 parameter validation when called. 257 258 Performance characteristics: 259 260 - Time complexity: O(n) where n is the number of expectations 261 - Each expectation triggers a suite save operation 262 - Validator creation overhead is amortized across all expectations 263 264 Examples 265 -------- 266 Add basic column existence and uniqueness expectations: 267 268 >>> adder = ValidatorBasedExpectationAddition() 269 >>> adder.add_expectations( 270 ... batch_manager, 271 ... expectations=[ 272 ... {"expect_column_to_exist": {"column": "user_id"}}, 273 ... {"expect_column_values_to_be_unique": {"column": "user_id"}}, 274 ... ], 275 ... ) 276 277 Add expectations with value constraints: 278 279 >>> adder.add_expectations( 280 ... batch_manager, 281 ... expectations=[ 282 ... { 283 ... "expect_column_values_to_be_in_set": { 284 ... "column": "status", 285 ... "value_set": ["active", "inactive", "pending"], 286 ... } 287 ... }, 288 ... { 289 ... "expect_column_values_to_be_between": { 290 ... "column": "age", 291 ... "min_value": 0, 292 ... "max_value": 150, 293 ... } 294 ... }, 295 ... ], 296 ... ) 297 298 Add regex and pattern-based expectations: 299 300 >>> adder.add_expectations( 301 ... batch_manager, 302 ... expectations=[ 303 ... { 304 ... "expect_column_values_to_match_regex": { 305 ... "column": "email", 306 ... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", 307 ... } 308 ... }, 309 ... {"expect_column_values_to_not_be_null": {"column": "email"}}, 310 ... ], 311 ... ) 312 313 Add expectations with metadata: 314 315 >>> adder.add_expectations( 316 ... batch_manager, 317 ... expectations=[ 318 ... { 319 ... "expect_column_mean_to_be_between": { 320 ... "column": "revenue", 321 ... "min_value": 1000, 322 ... "max_value": 100000, 323 ... "meta": {"notes": "Revenue should be within normal business range"}, 324 ... } 325 ... }, 326 ... ], 327 ... ) 328 329 Add multi-column expectations: 330 331 >>> adder.add_expectations( 332 ... batch_manager, 333 ... expectations=[ 334 ... { 335 ... "expect_column_pair_values_A_to_be_greater_than_B": { 336 ... "column_A": "end_date", 337 ... "column_B": "start_date", 338 ... "or_equal": True, 339 ... } 340 ... }, 341 ... ], 342 ... ) 343 """ 344 validator = create_batch_validator(batch_manager) 345 for expectation in expectations: 346 expectation_name, kwargs = parse_expectations_dict(expectation_dictionary=expectation) 347 getattr(validator, expectation_name)(**kwargs) 348 validator.save_expectation_suite()
Add expectations to the suite using a GX Validator object.
This method creates a GX Validator object from the batch manager and uses it to add expectations programmatically. Each expectation dictionary is parsed to extract the expectation type (method name) and its parameters, then the corresponding method is called on the validator. After each expectation is added, the expectation suite is automatically saved.
The method processes expectations sequentially, calling the appropriate expectation method on the validator for each one. The validator handles the creation of ExpectationConfiguration objects internally and adds them to the suite.
Parameters
- batch_manager (BatchManager): The batch manager containing the data context, batch request, and dataset name. This is used to create the validator and identify the target expectation suite.
expectations (list of dict): A list of expectation dictionaries to add to the suite. Each dictionary must contain exactly one key-value pair, where:
- The key is the expectation method name (e.g.,
"expect_column_values_to_be_in_set") - The value is a dictionary of keyword arguments to pass to that
expectation method (e.g.,
{"column": "col1", "value_set": [1, 2, 3]})
The expectation method names should match GX's standard expectation API.
- The key is the expectation method name (e.g.,
Returns
- None: This method modifies the expectation suite in place through the validator and does not return a value.
Raises
- InvalidExpectationDictionaryError: If any expectation dictionary does not contain exactly one key-value pair.
- InvalidExpectationNameTypeError: If any expectation method name is not a string.
- InvalidExpectationKwargsTypeError: If the parameters for any expectation are not provided as a dictionary.
- AttributeError: If the expectation method name does not exist on the Validator object (i.e., it's not a valid GX expectation).
- TypeError: If the expectation parameters don't match the expected signature for that expectation method.
- GreatExpectationsError: If the data context or expectation suite cannot be accessed, or if there are issues saving the expectation suite.
See Also
create_batch_validator: Creates the GX Validator object used by this method
parse_expectations_dict: Parses expectation dictionaries into method names and kwargs
ConfigurationBasedExpectationAddition: Alternative approach using ExpectationConfiguration
Notes
This method performs the following operations for each expectation:
- Parse the expectation dictionary to extract the method name and parameters
- Call
getattr(validator, expectation_name)(**kwargs)to add the expectation - Call
validator.save_expectation_suite()to persist the change
The validator is created once at the beginning of the method and reused for all expectations in the list. Each expectation triggers an individual save operation, which ensures that expectations are persisted even if a later expectation fails, but may impact performance when adding many expectations.
The validator-based approach provides immediate validation feedback and can help catch configuration errors early, as the expectation methods perform parameter validation when called.
Performance characteristics:
- Time complexity: O(n) where n is the number of expectations
- Each expectation triggers a suite save operation
- Validator creation overhead is amortized across all expectations
Examples
Add basic column existence and uniqueness expectations:
>>> adder = ValidatorBasedExpectationAddition()
>>> adder.add_expectations(
... batch_manager,
... expectations=[
... {"expect_column_to_exist": {"column": "user_id"}},
... {"expect_column_values_to_be_unique": {"column": "user_id"}},
... ],
... )
Add expectations with value constraints:
>>> adder.add_expectations(
... batch_manager,
... expectations=[
... {
... "expect_column_values_to_be_in_set": {
... "column": "status",
... "value_set": ["active", "inactive", "pending"],
... }
... },
... {
... "expect_column_values_to_be_between": {
... "column": "age",
... "min_value": 0,
... "max_value": 150,
... }
... },
... ],
... )
Add regex and pattern-based expectations:
>>> adder.add_expectations(
... batch_manager,
... expectations=[
... {
... "expect_column_values_to_match_regex": {
... "column": "email",
... "regex": r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$",
... }
... },
... {"expect_column_values_to_not_be_null": {"column": "email"}},
... ],
... )
Add expectations with metadata:
>>> adder.add_expectations(
... batch_manager,
... expectations=[
... {
... "expect_column_mean_to_be_between": {
... "column": "revenue",
... "min_value": 1000,
... "max_value": 100000,
... "meta": {"notes": "Revenue should be within normal business range"},
... }
... },
... ],
... )
Add multi-column expectations:
>>> adder.add_expectations(
... batch_manager,
... expectations=[
... {
... "expect_column_pair_values_A_to_be_greater_than_B": {
... "column_A": "end_date",
... "column_B": "start_date",
... "or_equal": True,
... }
... },
... ],
... )