adc_toolkit.data.validators.table_utils
Utilities for extracting metadata and type information from data objects.
This module provides framework-agnostic utility functions for inspecting DataFrames and extracting their structural properties. These utilities support both pandas and PySpark DataFrames, enabling consistent data handling across different execution engines.
The functions are primarily used internally by validators and data catalogs to:
- Determine which DataFrame framework is being used (pandas, pyspark, etc.)
- Extract schema information for validation and comparison
- Support engine-specific processing strategies
- Enable framework-agnostic data pipeline code
All functions work with data objects conforming to the Data protocol, which requires
columns and dtypes attributes. This protocol-based approach ensures
compatibility with any DataFrame-like object that implements these attributes.
Functions
extract_dataframe_type Determine the DataFrame framework type (pandas, pyspark, etc.) by inspecting the module path of the data object's type. Returns the top-level module name as a string. extract_dataframe_schema Extract column names and data types as a dictionary mapping column names to type strings. Framework-agnostic function that works with both pandas and PySpark DataFrames. extract_dataframe_schema_spark_native_format Extract Spark DataFrame schema using native Spark type names (e.g., "LongType", "StringType") rather than SQL format (e.g., "bigint", "string"). Specific to PySpark DataFrames.
Notes
Framework Detection
The extract_dataframe_type function identifies the DataFrame framework by
examining the module path. This approach is robust across framework versions as
top-level module names remain stable:
- pandas DataFrames: Returns "pandas"
- PySpark DataFrames: Returns "pyspark"
- Other frameworks: Returns their respective module names
Type String Formats
Schema extraction functions return type information as strings, but the format varies by framework:
Pandas: numpy/pandas dtype strings - Integer types: "int8", "int16", "int32", "int64", "uint8", etc. - Float types: "float16", "float32", "float64" - String/object: "object" - DateTime: "datetime64[ns]", "timedelta64[ns]" - Categorical: "category" - Boolean: "bool"
PySpark (SQL format via extract_dataframe_schema):
- Integer types: "tinyint", "smallint", "int", "bigint"
- Float types: "float", "double", "decimal"
- String: "string"
- DateTime: "timestamp", "date"
- Boolean: "boolean"
- Complex: "array
PySpark (Native format via extract_dataframe_schema_spark_native_format):
- Type objects: "LongType", "StringType", "DoubleType", "BooleanType", etc.
- Includes trailing "()" for complex types: "ArrayType(StringType())"
Use Cases
These utilities enable several common data engineering patterns:
- Conditional Processing: Apply framework-specific logic based on detected type
- Schema Validation: Compare extracted schemas against expected specifications
- Auto-generation: Generate validation schemas by introspecting data structure
- Logging: Record schema information for debugging and auditing
- Type Conversion: Determine appropriate type mappings between frameworks
Integration with Validators
The validator modules use these utilities extensively:
- Pandera:
compile_schema_scriptuses these to generate schema files - Great Expectations: Batch managers use these to select appropriate datasources
- ValidatedDataCatalog: Uses these to route data to appropriate validators
See Also
adc_toolkit.data.abs.Data: Protocol defining the Data interface that these utilities work with.
adc_toolkit.data.validators.pandera: Pandera validator that uses these utilities.
adc_toolkit.data.validators.gx.batch_managers: GX batch managers that use framework detection.
Examples
Determine DataFrame framework type:
>>> import pandas as pd
>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_type
>>>
>>> df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> framework = extract_dataframe_type(df)
>>> print(framework)
pandas
Extract schema from pandas DataFrame:
>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema
>>>
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.3, 30.7], "name": ["Alice", "Bob", "Charlie"]})
>>> schema = extract_dataframe_schema(df)
>>> print(schema)
{'id': 'int64', 'value': 'float64', 'name': 'object'}
Extract schema from PySpark DataFrame:
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> spark_df = spark.createDataFrame([(1, 10.5, "Alice"), (2, 20.3, "Bob")], ["id", "value", "name"])
>>>
>>> # SQL format
>>> sql_schema = extract_dataframe_schema(spark_df)
>>> print(sql_schema)
{'id': 'bigint', 'value': 'double', 'name': 'string'}
Extract Spark schema in native format:
>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema_spark_native_format
>>>
>>> native_schema = extract_dataframe_schema_spark_native_format(spark_df)
>>> print(native_schema)
{'id': 'LongType', 'value': 'DoubleType', 'name': 'StringType'}
Conditional processing based on framework type:
>>> def process_data(df):
... framework = extract_dataframe_type(df)
...
... if framework == "pandas":
... # Use pandas-specific operations
... return df.groupby("category").sum()
... elif framework == "pyspark":
... # Use Spark-specific operations
... return df.groupBy("category").sum()
... else:
... raise ValueError(f"Unsupported framework: {framework}")
Schema comparison for validation:
>>> current_schema = extract_dataframe_schema(df)
>>> expected_schema = {"id": "int64", "value": "float64", "name": "object"}
>>>
>>> if current_schema != expected_schema:
... raise ValueError(f"Schema mismatch.\nExpected: {expected_schema}\nGot: {current_schema}")
Logging schema information:
>>> import logging
>>> schema = extract_dataframe_schema(df)
>>> logging.info(f"Processing DataFrame with schema: {schema}")
INFO:root:Processing DataFrame with schema: {'id': 'int64', 'value': 'float64', 'name': 'object'}
Using in validator configuration:
>>> def configure_validator(data):
... framework = extract_dataframe_type(data)
...
... if framework == "pandas":
... return PandasValidator()
... elif framework == "pyspark":
... return SparkValidator()
... else:
... return GenericValidator()
Schema-based type conversion:
>>> def convert_to_spark_types(pandas_schema):
... type_mapping = {
... "int64": "bigint",
... "float64": "double",
... "object": "string",
... "bool": "boolean",
... }
... return {col: type_mapping.get(dtype, dtype) for col, dtype in pandas_schema.items()}
>>>
>>> pandas_schema = extract_dataframe_schema(df)
>>> spark_schema = convert_to_spark_types(pandas_schema)
>>> print(spark_schema)
{'id': 'bigint', 'value': 'double', 'name': 'string'}
1""" 2Utilities for extracting metadata and type information from data objects. 3 4This module provides framework-agnostic utility functions for inspecting DataFrames 5and extracting their structural properties. These utilities support both pandas and 6PySpark DataFrames, enabling consistent data handling across different execution 7engines. 8 9The functions are primarily used internally by validators and data catalogs to: 10 11- Determine which DataFrame framework is being used (pandas, pyspark, etc.) 12- Extract schema information for validation and comparison 13- Support engine-specific processing strategies 14- Enable framework-agnostic data pipeline code 15 16All functions work with data objects conforming to the Data protocol, which requires 17``columns`` and ``dtypes`` attributes. This protocol-based approach ensures 18compatibility with any DataFrame-like object that implements these attributes. 19 20Functions 21--------- 22extract_dataframe_type 23 Determine the DataFrame framework type (pandas, pyspark, etc.) by inspecting 24 the module path of the data object's type. Returns the top-level module name 25 as a string. 26extract_dataframe_schema 27 Extract column names and data types as a dictionary mapping column names to 28 type strings. Framework-agnostic function that works with both pandas and 29 PySpark DataFrames. 30extract_dataframe_schema_spark_native_format 31 Extract Spark DataFrame schema using native Spark type names (e.g., "LongType", 32 "StringType") rather than SQL format (e.g., "bigint", "string"). Specific to 33 PySpark DataFrames. 34 35Notes 36----- 37**Framework Detection** 38 39The ``extract_dataframe_type`` function identifies the DataFrame framework by 40examining the module path. This approach is robust across framework versions as 41top-level module names remain stable: 42 43- pandas DataFrames: Returns "pandas" 44- PySpark DataFrames: Returns "pyspark" 45- Other frameworks: Returns their respective module names 46 47**Type String Formats** 48 49Schema extraction functions return type information as strings, but the format 50varies by framework: 51 52**Pandas**: numpy/pandas dtype strings 53 - Integer types: "int8", "int16", "int32", "int64", "uint8", etc. 54 - Float types: "float16", "float32", "float64" 55 - String/object: "object" 56 - DateTime: "datetime64[ns]", "timedelta64[ns]" 57 - Categorical: "category" 58 - Boolean: "bool" 59 60**PySpark** (SQL format via ``extract_dataframe_schema``): 61 - Integer types: "tinyint", "smallint", "int", "bigint" 62 - Float types: "float", "double", "decimal" 63 - String: "string" 64 - DateTime: "timestamp", "date" 65 - Boolean: "boolean" 66 - Complex: "array<type>", "map<type,type>", "struct<...>" 67 68**PySpark** (Native format via ``extract_dataframe_schema_spark_native_format``): 69 - Type objects: "LongType", "StringType", "DoubleType", "BooleanType", etc. 70 - Includes trailing "()" for complex types: "ArrayType(StringType())" 71 72**Use Cases** 73 74These utilities enable several common data engineering patterns: 75 761. **Conditional Processing**: Apply framework-specific logic based on detected type 772. **Schema Validation**: Compare extracted schemas against expected specifications 783. **Auto-generation**: Generate validation schemas by introspecting data structure 794. **Logging**: Record schema information for debugging and auditing 805. **Type Conversion**: Determine appropriate type mappings between frameworks 81 82**Integration with Validators** 83 84The validator modules use these utilities extensively: 85 86- **Pandera**: ``compile_schema_script`` uses these to generate schema files 87- **Great Expectations**: Batch managers use these to select appropriate datasources 88- **ValidatedDataCatalog**: Uses these to route data to appropriate validators 89 90See Also 91-------- 92adc_toolkit.data.abs.Data : Protocol defining the Data interface that these utilities work with. 93adc_toolkit.data.validators.pandera : Pandera validator that uses these utilities. 94adc_toolkit.data.validators.gx.batch_managers : GX batch managers that use framework detection. 95 96Examples 97-------- 98Determine DataFrame framework type: 99 100>>> import pandas as pd 101>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_type 102>>> 103>>> df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]}) 104>>> framework = extract_dataframe_type(df) 105>>> print(framework) 106pandas 107 108Extract schema from pandas DataFrame: 109 110>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema 111>>> 112>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.3, 30.7], "name": ["Alice", "Bob", "Charlie"]}) 113>>> schema = extract_dataframe_schema(df) 114>>> print(schema) 115{'id': 'int64', 'value': 'float64', 'name': 'object'} 116 117Extract schema from PySpark DataFrame: 118 119>>> from pyspark.sql import SparkSession 120>>> spark = SparkSession.builder.getOrCreate() 121>>> spark_df = spark.createDataFrame([(1, 10.5, "Alice"), (2, 20.3, "Bob")], ["id", "value", "name"]) 122>>> 123>>> # SQL format 124>>> sql_schema = extract_dataframe_schema(spark_df) 125>>> print(sql_schema) 126{'id': 'bigint', 'value': 'double', 'name': 'string'} 127 128Extract Spark schema in native format: 129 130>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema_spark_native_format 131>>> 132>>> native_schema = extract_dataframe_schema_spark_native_format(spark_df) 133>>> print(native_schema) 134{'id': 'LongType', 'value': 'DoubleType', 'name': 'StringType'} 135 136Conditional processing based on framework type: 137 138>>> def process_data(df): 139... framework = extract_dataframe_type(df) 140... 141... if framework == "pandas": 142... # Use pandas-specific operations 143... return df.groupby("category").sum() 144... elif framework == "pyspark": 145... # Use Spark-specific operations 146... return df.groupBy("category").sum() 147... else: 148... raise ValueError(f"Unsupported framework: {framework}") 149 150Schema comparison for validation: 151 152>>> current_schema = extract_dataframe_schema(df) 153>>> expected_schema = {"id": "int64", "value": "float64", "name": "object"} 154>>> 155>>> if current_schema != expected_schema: 156... raise ValueError(f"Schema mismatch.\\nExpected: {expected_schema}\\nGot: {current_schema}") 157 158Logging schema information: 159 160>>> import logging 161>>> schema = extract_dataframe_schema(df) 162>>> logging.info(f"Processing DataFrame with schema: {schema}") 163INFO:root:Processing DataFrame with schema: {'id': 'int64', 'value': 'float64', 'name': 'object'} 164 165Using in validator configuration: 166 167>>> def configure_validator(data): 168... framework = extract_dataframe_type(data) 169... 170... if framework == "pandas": 171... return PandasValidator() 172... elif framework == "pyspark": 173... return SparkValidator() 174... else: 175... return GenericValidator() 176 177Schema-based type conversion: 178 179>>> def convert_to_spark_types(pandas_schema): 180... type_mapping = { 181... "int64": "bigint", 182... "float64": "double", 183... "object": "string", 184... "bool": "boolean", 185... } 186... return {col: type_mapping.get(dtype, dtype) for col, dtype in pandas_schema.items()} 187>>> 188>>> pandas_schema = extract_dataframe_schema(df) 189>>> spark_schema = convert_to_spark_types(pandas_schema) 190>>> print(spark_schema) 191{'id': 'bigint', 'value': 'double', 'name': 'string'} 192"""