adc_toolkit.data.validators.table_utils

Utilities for extracting metadata and type information from data objects.

This module provides framework-agnostic utility functions for inspecting DataFrames and extracting their structural properties. These utilities support both pandas and PySpark DataFrames, enabling consistent data handling across different execution engines.

The functions are primarily used internally by validators and data catalogs to:

  • Determine which DataFrame framework is being used (pandas, pyspark, etc.)
  • Extract schema information for validation and comparison
  • Support engine-specific processing strategies
  • Enable framework-agnostic data pipeline code

All functions work with data objects conforming to the Data protocol, which requires columns and dtypes attributes. This protocol-based approach ensures compatibility with any DataFrame-like object that implements these attributes.

Functions

extract_dataframe_type Determine the DataFrame framework type (pandas, pyspark, etc.) by inspecting the module path of the data object's type. Returns the top-level module name as a string. extract_dataframe_schema Extract column names and data types as a dictionary mapping column names to type strings. Framework-agnostic function that works with both pandas and PySpark DataFrames. extract_dataframe_schema_spark_native_format Extract Spark DataFrame schema using native Spark type names (e.g., "LongType", "StringType") rather than SQL format (e.g., "bigint", "string"). Specific to PySpark DataFrames.

Notes

Framework Detection

The extract_dataframe_type function identifies the DataFrame framework by examining the module path. This approach is robust across framework versions as top-level module names remain stable:

  • pandas DataFrames: Returns "pandas"
  • PySpark DataFrames: Returns "pyspark"
  • Other frameworks: Returns their respective module names

Type String Formats

Schema extraction functions return type information as strings, but the format varies by framework:

Pandas: numpy/pandas dtype strings - Integer types: "int8", "int16", "int32", "int64", "uint8", etc. - Float types: "float16", "float32", "float64" - String/object: "object" - DateTime: "datetime64[ns]", "timedelta64[ns]" - Categorical: "category" - Boolean: "bool"

PySpark (SQL format via extract_dataframe_schema): - Integer types: "tinyint", "smallint", "int", "bigint" - Float types: "float", "double", "decimal" - String: "string" - DateTime: "timestamp", "date" - Boolean: "boolean" - Complex: "array", "map", "struct<...>"

PySpark (Native format via extract_dataframe_schema_spark_native_format): - Type objects: "LongType", "StringType", "DoubleType", "BooleanType", etc. - Includes trailing "()" for complex types: "ArrayType(StringType())"

Use Cases

These utilities enable several common data engineering patterns:

  1. Conditional Processing: Apply framework-specific logic based on detected type
  2. Schema Validation: Compare extracted schemas against expected specifications
  3. Auto-generation: Generate validation schemas by introspecting data structure
  4. Logging: Record schema information for debugging and auditing
  5. Type Conversion: Determine appropriate type mappings between frameworks

Integration with Validators

The validator modules use these utilities extensively:

  • Pandera: compile_schema_script uses these to generate schema files
  • Great Expectations: Batch managers use these to select appropriate datasources
  • ValidatedDataCatalog: Uses these to route data to appropriate validators
See Also

adc_toolkit.data.abs.Data: Protocol defining the Data interface that these utilities work with.
adc_toolkit.data.validators.pandera: Pandera validator that uses these utilities.
adc_toolkit.data.validators.gx.batch_managers: GX batch managers that use framework detection.

Examples

Determine DataFrame framework type:

>>> import pandas as pd
>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_type
>>>
>>> df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> framework = extract_dataframe_type(df)
>>> print(framework)
pandas

Extract schema from pandas DataFrame:

>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema
>>>
>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.3, 30.7], "name": ["Alice", "Bob", "Charlie"]})
>>> schema = extract_dataframe_schema(df)
>>> print(schema)
{'id': 'int64', 'value': 'float64', 'name': 'object'}

Extract schema from PySpark DataFrame:

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> spark_df = spark.createDataFrame([(1, 10.5, "Alice"), (2, 20.3, "Bob")], ["id", "value", "name"])
>>>
>>> # SQL format
>>> sql_schema = extract_dataframe_schema(spark_df)
>>> print(sql_schema)
{'id': 'bigint', 'value': 'double', 'name': 'string'}

Extract Spark schema in native format:

>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema_spark_native_format
>>>
>>> native_schema = extract_dataframe_schema_spark_native_format(spark_df)
>>> print(native_schema)
{'id': 'LongType', 'value': 'DoubleType', 'name': 'StringType'}

Conditional processing based on framework type:

>>> def process_data(df):
...     framework = extract_dataframe_type(df)
...
...     if framework == "pandas":
...         # Use pandas-specific operations
...         return df.groupby("category").sum()
...     elif framework == "pyspark":
...         # Use Spark-specific operations
...         return df.groupBy("category").sum()
...     else:
...         raise ValueError(f"Unsupported framework: {framework}")

Schema comparison for validation:

>>> current_schema = extract_dataframe_schema(df)
>>> expected_schema = {"id": "int64", "value": "float64", "name": "object"}
>>>
>>> if current_schema != expected_schema:
...     raise ValueError(f"Schema mismatch.\nExpected: {expected_schema}\nGot: {current_schema}")

Logging schema information:

>>> import logging
>>> schema = extract_dataframe_schema(df)
>>> logging.info(f"Processing DataFrame with schema: {schema}")
INFO:root:Processing DataFrame with schema: {'id': 'int64', 'value': 'float64', 'name': 'object'}

Using in validator configuration:

>>> def configure_validator(data):
...     framework = extract_dataframe_type(data)
...
...     if framework == "pandas":
...         return PandasValidator()
...     elif framework == "pyspark":
...         return SparkValidator()
...     else:
...         return GenericValidator()

Schema-based type conversion:

>>> def convert_to_spark_types(pandas_schema):
...     type_mapping = {
...         "int64": "bigint",
...         "float64": "double",
...         "object": "string",
...         "bool": "boolean",
...     }
...     return {col: type_mapping.get(dtype, dtype) for col, dtype in pandas_schema.items()}
>>>
>>> pandas_schema = extract_dataframe_schema(df)
>>> spark_schema = convert_to_spark_types(pandas_schema)
>>> print(spark_schema)
{'id': 'bigint', 'value': 'double', 'name': 'string'}
  1"""
  2Utilities for extracting metadata and type information from data objects.
  3
  4This module provides framework-agnostic utility functions for inspecting DataFrames
  5and extracting their structural properties. These utilities support both pandas and
  6PySpark DataFrames, enabling consistent data handling across different execution
  7engines.
  8
  9The functions are primarily used internally by validators and data catalogs to:
 10
 11- Determine which DataFrame framework is being used (pandas, pyspark, etc.)
 12- Extract schema information for validation and comparison
 13- Support engine-specific processing strategies
 14- Enable framework-agnostic data pipeline code
 15
 16All functions work with data objects conforming to the Data protocol, which requires
 17``columns`` and ``dtypes`` attributes. This protocol-based approach ensures
 18compatibility with any DataFrame-like object that implements these attributes.
 19
 20Functions
 21---------
 22extract_dataframe_type
 23    Determine the DataFrame framework type (pandas, pyspark, etc.) by inspecting
 24    the module path of the data object's type. Returns the top-level module name
 25    as a string.
 26extract_dataframe_schema
 27    Extract column names and data types as a dictionary mapping column names to
 28    type strings. Framework-agnostic function that works with both pandas and
 29    PySpark DataFrames.
 30extract_dataframe_schema_spark_native_format
 31    Extract Spark DataFrame schema using native Spark type names (e.g., "LongType",
 32    "StringType") rather than SQL format (e.g., "bigint", "string"). Specific to
 33    PySpark DataFrames.
 34
 35Notes
 36-----
 37**Framework Detection**
 38
 39The ``extract_dataframe_type`` function identifies the DataFrame framework by
 40examining the module path. This approach is robust across framework versions as
 41top-level module names remain stable:
 42
 43- pandas DataFrames: Returns "pandas"
 44- PySpark DataFrames: Returns "pyspark"
 45- Other frameworks: Returns their respective module names
 46
 47**Type String Formats**
 48
 49Schema extraction functions return type information as strings, but the format
 50varies by framework:
 51
 52**Pandas**: numpy/pandas dtype strings
 53    - Integer types: "int8", "int16", "int32", "int64", "uint8", etc.
 54    - Float types: "float16", "float32", "float64"
 55    - String/object: "object"
 56    - DateTime: "datetime64[ns]", "timedelta64[ns]"
 57    - Categorical: "category"
 58    - Boolean: "bool"
 59
 60**PySpark** (SQL format via ``extract_dataframe_schema``):
 61    - Integer types: "tinyint", "smallint", "int", "bigint"
 62    - Float types: "float", "double", "decimal"
 63    - String: "string"
 64    - DateTime: "timestamp", "date"
 65    - Boolean: "boolean"
 66    - Complex: "array<type>", "map<type,type>", "struct<...>"
 67
 68**PySpark** (Native format via ``extract_dataframe_schema_spark_native_format``):
 69    - Type objects: "LongType", "StringType", "DoubleType", "BooleanType", etc.
 70    - Includes trailing "()" for complex types: "ArrayType(StringType())"
 71
 72**Use Cases**
 73
 74These utilities enable several common data engineering patterns:
 75
 761. **Conditional Processing**: Apply framework-specific logic based on detected type
 772. **Schema Validation**: Compare extracted schemas against expected specifications
 783. **Auto-generation**: Generate validation schemas by introspecting data structure
 794. **Logging**: Record schema information for debugging and auditing
 805. **Type Conversion**: Determine appropriate type mappings between frameworks
 81
 82**Integration with Validators**
 83
 84The validator modules use these utilities extensively:
 85
 86- **Pandera**: ``compile_schema_script`` uses these to generate schema files
 87- **Great Expectations**: Batch managers use these to select appropriate datasources
 88- **ValidatedDataCatalog**: Uses these to route data to appropriate validators
 89
 90See Also
 91--------
 92adc_toolkit.data.abs.Data : Protocol defining the Data interface that these utilities work with.
 93adc_toolkit.data.validators.pandera : Pandera validator that uses these utilities.
 94adc_toolkit.data.validators.gx.batch_managers : GX batch managers that use framework detection.
 95
 96Examples
 97--------
 98Determine DataFrame framework type:
 99
100>>> import pandas as pd
101>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_type
102>>>
103>>> df = pd.DataFrame({"x": [1, 2, 3], "y": [4, 5, 6]})
104>>> framework = extract_dataframe_type(df)
105>>> print(framework)
106pandas
107
108Extract schema from pandas DataFrame:
109
110>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema
111>>>
112>>> df = pd.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.3, 30.7], "name": ["Alice", "Bob", "Charlie"]})
113>>> schema = extract_dataframe_schema(df)
114>>> print(schema)
115{'id': 'int64', 'value': 'float64', 'name': 'object'}
116
117Extract schema from PySpark DataFrame:
118
119>>> from pyspark.sql import SparkSession
120>>> spark = SparkSession.builder.getOrCreate()
121>>> spark_df = spark.createDataFrame([(1, 10.5, "Alice"), (2, 20.3, "Bob")], ["id", "value", "name"])
122>>>
123>>> # SQL format
124>>> sql_schema = extract_dataframe_schema(spark_df)
125>>> print(sql_schema)
126{'id': 'bigint', 'value': 'double', 'name': 'string'}
127
128Extract Spark schema in native format:
129
130>>> from adc_toolkit.data.validators.table_utils import extract_dataframe_schema_spark_native_format
131>>>
132>>> native_schema = extract_dataframe_schema_spark_native_format(spark_df)
133>>> print(native_schema)
134{'id': 'LongType', 'value': 'DoubleType', 'name': 'StringType'}
135
136Conditional processing based on framework type:
137
138>>> def process_data(df):
139...     framework = extract_dataframe_type(df)
140...
141...     if framework == "pandas":
142...         # Use pandas-specific operations
143...         return df.groupby("category").sum()
144...     elif framework == "pyspark":
145...         # Use Spark-specific operations
146...         return df.groupBy("category").sum()
147...     else:
148...         raise ValueError(f"Unsupported framework: {framework}")
149
150Schema comparison for validation:
151
152>>> current_schema = extract_dataframe_schema(df)
153>>> expected_schema = {"id": "int64", "value": "float64", "name": "object"}
154>>>
155>>> if current_schema != expected_schema:
156...     raise ValueError(f"Schema mismatch.\\nExpected: {expected_schema}\\nGot: {current_schema}")
157
158Logging schema information:
159
160>>> import logging
161>>> schema = extract_dataframe_schema(df)
162>>> logging.info(f"Processing DataFrame with schema: {schema}")
163INFO:root:Processing DataFrame with schema: {'id': 'int64', 'value': 'float64', 'name': 'object'}
164
165Using in validator configuration:
166
167>>> def configure_validator(data):
168...     framework = extract_dataframe_type(data)
169...
170...     if framework == "pandas":
171...         return PandasValidator()
172...     elif framework == "pyspark":
173...         return SparkValidator()
174...     else:
175...         return GenericValidator()
176
177Schema-based type conversion:
178
179>>> def convert_to_spark_types(pandas_schema):
180...     type_mapping = {
181...         "int64": "bigint",
182...         "float64": "double",
183...         "object": "string",
184...         "bool": "boolean",
185...     }
186...     return {col: type_mapping.get(dtype, dtype) for col, dtype in pandas_schema.items()}
187>>>
188>>> pandas_schema = extract_dataframe_schema(df)
189>>> spark_schema = convert_to_spark_types(pandas_schema)
190>>> print(spark_schema)
191{'id': 'bigint', 'value': 'double', 'name': 'string'}
192"""