adc_toolkit.data.catalogs.kedro
Kedro-based data catalog implementation for the adc-toolkit.
This module provides a production-ready data catalog implementation using Kedro's DataCatalog as the underlying I/O engine. It enables configuration-driven dataset management through YAML files, supporting diverse data formats, storage backends, versioning, partitioning, and dynamic SQL queries.
The KedroDataCatalog wraps Kedro's native catalog with a simplified interface that integrates seamlessly with the adc-toolkit's validation and processing pipeline. It provides factory methods for instantiation and utilities for scaffolding new catalog configurations.
Classes
KedroDataCatalog
Main Kedro catalog implementation with methods for loading and saving
datasets based on YAML configuration files. Provides in_directory()
factory method and init_catalog() scaffolding utility.
Submodules
kedro_catalog Core KedroDataCatalog implementation. kedro_configs Configuration loader utilities for Kedro OmegaConfigLoader. scaffold Utilities for creating catalog directory structures with template files. templates Template YAML files for catalog, globals, and credentials configurations.
See Also
adc_toolkit.data.abs.DataCatalog: Protocol definition for catalogs.
adc_toolkit.data.catalog.ValidatedDataCatalog: Catalog with automatic validation.
kedro.io.DataCatalog: Underlying Kedro catalog implementation.
kedro.config.OmegaConfigLoader: Configuration loader for YAML files.
Notes
The Kedro data catalog system uses YAML configuration files to define datasets, including file paths, formats, load/save parameters, and storage backends. This declarative approach separates data I/O concerns from business logic and enables environment-specific configurations without code changes.
Expected Directory Structure
The catalog expects a specific configuration directory structure::
config_path/
base/
catalog.yml # Base dataset definitions shared across environments
globals.yml # Global variables (e.g., base_path, bucket_name)
local/
catalog.yml # Local overrides for development (gitignored)
credentials.yml # Credentials for databases/cloud (gitignored)
The base/ directory contains shared definitions, while local/ contains
environment-specific overrides. Local files should be added to .gitignore to
prevent committing credentials or environment-specific paths.
Configuration Format
Dataset definitions in catalog.yml follow Kedro's format:
# Simple CSV dataset
customer_data:
type: pandas.CSVDataset
filepath: data/raw/customers.csv
load_args:
sep: ","
parse_dates: ["signup_date"]
save_args:
index: False
# Parquet dataset with versioning
processed_features:
type: pandas.ParquetDataset
filepath: data/processed/features.parquet
versioned: true
# SQL dataset with dynamic query parameters
sales_query:
type: pandas.SQLQueryDataset
sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
credentials: database_creds
# Cloud storage dataset
s3_data:
type: pandas.ParquetDataset
filepath: s3://my-bucket/data/dataset.parquet
credentials: aws_credentials
Global variables in globals.yml can be referenced in catalog.yml using
${globals:variable_name} syntax for parameterization.
Supported Features
- File Formats: CSV, Parquet, JSON, Excel, Pickle, HDF5, Feather, ORC
- Storage Backends: Local filesystem, S3, GCS, Azure Blob, HDFS
- Versioning: Automatic timestamping of saved datasets
- Partitioning: Split large datasets across multiple files
- Dynamic Queries: SQL query parameterization at load time
- Dataset Factories: Pattern-based dataset definitions for systematic naming
- Credentials Management: Secure credential storage in local/credentials.yml
Thread Safety
KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe for concurrent writes to the same dataset. Concurrent reads are safe. Use external locking mechanisms if concurrent writes are required.
References
Examples
Create a catalog from an existing configuration directory:
>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("customer_data")
>>> df.columns
Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
Save processed data:
>>> processed_df = process_customers(df)
>>> catalog.save("processed_customers", processed_df)
Initialize a new catalog structure with template files:
>>> result = KedroDataCatalog.init_catalog(
... "./my_project/config/catalog",
... include_globals=True,
... include_catalog=True,
... include_credentials=True,
... )
>>> print(f"Created: {[f.name for f in result.created_files]}")
Created: ['catalog.yml', 'globals.yml', 'credentials.yml']
Load data with dynamic SQL query parameters:
>>> # catalog.yml defines: sql: "SELECT * FROM sales WHERE year={year}"
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> sales_2024 = catalog.load("sales_data", year=2024)
>>> sales_2023 = catalog.load("sales_data", year=2023)
Use in a complete data pipeline:
>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
>>>
>>> def run_pipeline():
... catalog = KedroDataCatalog.in_directory("config/production")
...
... # Extract
... raw_sales = catalog.load("raw_sales")
... raw_customers = catalog.load("raw_customers")
...
... # Transform
... cleaned_sales = clean_data(raw_sales)
... enriched = enrich_with_customer_data(cleaned_sales, raw_customers)
...
... # Load (save results)
... catalog.save("cleaned_sales", cleaned_sales)
... catalog.save("enriched_sales", enriched)
>>> run_pipeline()
Create catalog with custom configuration loader:
>>> from kedro.config import OmegaConfigLoader
>>> loader = OmegaConfigLoader(
... conf_source="config",
... env="production",
... base_env="base",
... default_run_env="local",
... )
>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
1""" 2Kedro-based data catalog implementation for the adc-toolkit. 3 4This module provides a production-ready data catalog implementation using Kedro's 5DataCatalog as the underlying I/O engine. It enables configuration-driven dataset 6management through YAML files, supporting diverse data formats, storage backends, 7versioning, partitioning, and dynamic SQL queries. 8 9The KedroDataCatalog wraps Kedro's native catalog with a simplified interface 10that integrates seamlessly with the adc-toolkit's validation and processing 11pipeline. It provides factory methods for instantiation and utilities for 12scaffolding new catalog configurations. 13 14Classes 15------- 16KedroDataCatalog 17 Main Kedro catalog implementation with methods for loading and saving 18 datasets based on YAML configuration files. Provides `in_directory()` 19 factory method and `init_catalog()` scaffolding utility. 20 21Submodules 22---------- 23kedro_catalog 24 Core KedroDataCatalog implementation. 25kedro_configs 26 Configuration loader utilities for Kedro OmegaConfigLoader. 27scaffold 28 Utilities for creating catalog directory structures with template files. 29templates 30 Template YAML files for catalog, globals, and credentials configurations. 31 32See Also 33-------- 34adc_toolkit.data.abs.DataCatalog : Protocol definition for catalogs. 35adc_toolkit.data.catalog.ValidatedDataCatalog : Catalog with automatic validation. 36kedro.io.DataCatalog : Underlying Kedro catalog implementation. 37kedro.config.OmegaConfigLoader : Configuration loader for YAML files. 38 39Notes 40----- 41The Kedro data catalog system uses YAML configuration files to define datasets, 42including file paths, formats, load/save parameters, and storage backends. This 43declarative approach separates data I/O concerns from business logic and enables 44environment-specific configurations without code changes. 45 46Expected Directory Structure 47---------------------------- 48The catalog expects a specific configuration directory structure:: 49 50 config_path/ 51 base/ 52 catalog.yml # Base dataset definitions shared across environments 53 globals.yml # Global variables (e.g., base_path, bucket_name) 54 local/ 55 catalog.yml # Local overrides for development (gitignored) 56 credentials.yml # Credentials for databases/cloud (gitignored) 57 58The `base/` directory contains shared definitions, while `local/` contains 59environment-specific overrides. Local files should be added to .gitignore to 60prevent committing credentials or environment-specific paths. 61 62Configuration Format 63-------------------- 64Dataset definitions in catalog.yml follow Kedro's format: 65 66.. code-block:: yaml 67 68 # Simple CSV dataset 69 customer_data: 70 type: pandas.CSVDataset 71 filepath: data/raw/customers.csv 72 load_args: 73 sep: "," 74 parse_dates: ["signup_date"] 75 save_args: 76 index: False 77 78 # Parquet dataset with versioning 79 processed_features: 80 type: pandas.ParquetDataset 81 filepath: data/processed/features.parquet 82 versioned: true 83 84 # SQL dataset with dynamic query parameters 85 sales_query: 86 type: pandas.SQLQueryDataset 87 sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'" 88 credentials: database_creds 89 90 # Cloud storage dataset 91 s3_data: 92 type: pandas.ParquetDataset 93 filepath: s3://my-bucket/data/dataset.parquet 94 credentials: aws_credentials 95 96Global variables in globals.yml can be referenced in catalog.yml using 97`${globals:variable_name}` syntax for parameterization. 98 99Supported Features 100------------------ 101- **File Formats**: CSV, Parquet, JSON, Excel, Pickle, HDF5, Feather, ORC 102- **Storage Backends**: Local filesystem, S3, GCS, Azure Blob, HDFS 103- **Versioning**: Automatic timestamping of saved datasets 104- **Partitioning**: Split large datasets across multiple files 105- **Dynamic Queries**: SQL query parameterization at load time 106- **Dataset Factories**: Pattern-based dataset definitions for systematic naming 107- **Credentials Management**: Secure credential storage in local/credentials.yml 108 109Thread Safety 110------------- 111KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe for 112concurrent writes to the same dataset. Concurrent reads are safe. Use external 113locking mechanisms if concurrent writes are required. 114 115References 116---------- 117.. [1] Kedro Documentation: Data Catalog 118 https://docs.kedro.org/en/stable/data/data_catalog.html 119.. [2] Kedro Documentation: Configuration 120 https://docs.kedro.org/en/stable/configuration/configuration_basics.html 121 122Examples 123-------- 124Create a catalog from an existing configuration directory: 125 126>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog 127>>> catalog = KedroDataCatalog.in_directory("config/catalog") 128>>> df = catalog.load("customer_data") 129>>> df.columns 130Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object') 131 132Save processed data: 133 134>>> processed_df = process_customers(df) 135>>> catalog.save("processed_customers", processed_df) 136 137Initialize a new catalog structure with template files: 138 139>>> result = KedroDataCatalog.init_catalog( 140... "./my_project/config/catalog", 141... include_globals=True, 142... include_catalog=True, 143... include_credentials=True, 144... ) 145>>> print(f"Created: {[f.name for f in result.created_files]}") 146Created: ['catalog.yml', 'globals.yml', 'credentials.yml'] 147 148Load data with dynamic SQL query parameters: 149 150>>> # catalog.yml defines: sql: "SELECT * FROM sales WHERE year={year}" 151>>> catalog = KedroDataCatalog.in_directory("config/catalog") 152>>> sales_2024 = catalog.load("sales_data", year=2024) 153>>> sales_2023 = catalog.load("sales_data", year=2023) 154 155Use in a complete data pipeline: 156 157>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog 158>>> 159>>> def run_pipeline(): 160... catalog = KedroDataCatalog.in_directory("config/production") 161... 162... # Extract 163... raw_sales = catalog.load("raw_sales") 164... raw_customers = catalog.load("raw_customers") 165... 166... # Transform 167... cleaned_sales = clean_data(raw_sales) 168... enriched = enrich_with_customer_data(cleaned_sales, raw_customers) 169... 170... # Load (save results) 171... catalog.save("cleaned_sales", cleaned_sales) 172... catalog.save("enriched_sales", enriched) 173>>> run_pipeline() 174 175Create catalog with custom configuration loader: 176 177>>> from kedro.config import OmegaConfigLoader 178>>> loader = OmegaConfigLoader( 179... conf_source="config", 180... env="production", 181... base_env="base", 182... default_run_env="local", 183... ) 184>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader) 185""" 186 187from .kedro_catalog import KedroDataCatalog 188 189 190__all__ = ["KedroDataCatalog"]
78class KedroDataCatalog: 79 """ 80 Kedro-based implementation of the DataCatalog protocol. 81 82 This class provides a production-ready data catalog using Kedro's DataCatalog 83 as the underlying I/O engine. It supports configuration-driven dataset management 84 through YAML files, enabling declarative definitions of data sources, file formats, 85 load/save parameters, and storage backends. 86 87 The catalog handles diverse data formats (CSV, Parquet, JSON, Excel, Pickle, HDF5), 88 storage locations (local filesystem, S3, GCS, Azure Blob), and advanced features 89 like versioning, partitioning, and dynamic SQL queries. 90 91 Parameters 92 ---------- 93 config_path : str or pathlib.Path 94 Path to the configuration directory containing catalog YAML files. 95 The directory should contain base/ and local/ subdirectories with 96 catalog.yml, globals.yml, and optionally credentials.yml files. 97 config_loader : kedro.config.AbstractConfigLoader or None, default=None 98 Kedro configuration loader instance. If None, an OmegaConfigLoader 99 will be created automatically to load YAML configurations from the 100 config_path directory. 101 102 Attributes 103 ---------- 104 config_path : str 105 Path to the configuration directory as a string. 106 config_loader : kedro.config.AbstractConfigLoader 107 The configuration loader instance used to read YAML files. 108 _catalog : kedro.io.DataCatalog 109 Internal Kedro DataCatalog instance handling actual I/O operations. 110 111 Methods 112 ------- 113 load(name, **query_args) 114 Load a dataset by name from the catalog. 115 save(name, data) 116 Save a dataset by name to the catalog. 117 in_directory(path) 118 Factory method to create a catalog from a configuration directory. 119 init_catalog(path, overwrite=False, include_globals=True, ...) 120 Create the Kedro catalog folder structure with template files. 121 122 Raises 123 ------ 124 FileNotFoundError 125 If the configuration directory does not exist or the catalog structure 126 is incomplete (missing required base/ or local/ directories). 127 128 See Also 129 -------- 130 adc_toolkit.data.abs.DataCatalog : Protocol definition for data catalogs. 131 adc_toolkit.data.validated_catalog.ValidatedDataCatalog : Catalog with validation. 132 kedro.io.DataCatalog : Underlying Kedro catalog implementation. 133 kedro.config.OmegaConfigLoader : Default configuration loader. 134 135 Notes 136 ----- 137 The catalog expects a specific directory structure: 138 139 config_path/ 140 base/ 141 catalog.yml # Base dataset definitions 142 globals.yml # Global variables and parameters 143 local/ 144 catalog.yml # Local overrides (gitignored) 145 credentials.yml # Credentials (gitignored) 146 147 Dataset definitions in catalog.yml follow Kedro's format: 148 149 .. code-block:: yaml 150 151 dataset_name: 152 type: pandas.CSVDataset 153 filepath: data/raw/dataset.csv 154 load_args: 155 sep: "," 156 save_args: 157 index: False 158 159 The catalog supports versioning, which automatically timestamps saved datasets 160 and allows loading specific versions. Partitioning enables splitting large 161 datasets into multiple files for parallel processing. 162 163 For SQL datasets, the catalog supports dynamic query parameters that can be 164 provided at load time using the query_args keyword arguments. 165 166 Thread Safety 167 ------------- 168 The KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe 169 for concurrent writes to the same dataset. Concurrent reads are safe. 170 171 References 172 ---------- 173 .. [1] Kedro Documentation: Data Catalog 174 https://docs.kedro.org/en/stable/data/data_catalog.html 175 .. [2] Kedro Documentation: Configuration 176 https://docs.kedro.org/en/stable/configuration/configuration_basics.html 177 178 Examples 179 -------- 180 Create a catalog using the factory method: 181 182 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 183 >>> df = catalog.load("customer_data") 184 >>> df.columns 185 Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object') 186 187 Save processed data: 188 189 >>> processed_df = process_customers(df) 190 >>> catalog.save("processed_customers", processed_df) 191 192 Load data with dynamic SQL query parameters: 193 194 >>> # catalog.yml defines: SELECT * FROM sales WHERE year={year} AND region='{region}' 195 >>> sales = catalog.load("sales_data", year=2024, region="EMEA") 196 >>> sales.shape 197 (15420, 8) 198 199 Create a catalog with custom config loader: 200 201 >>> from kedro.config import OmegaConfigLoader 202 >>> loader = OmegaConfigLoader(conf_source="config", env="production", base_env="base", default_run_env="local") 203 >>> catalog = KedroDataCatalog("config/catalog", config_loader=loader) 204 205 Initialize a new catalog structure: 206 207 >>> result = KedroDataCatalog.init_catalog( 208 ... "./my_project/config", include_globals=True, include_catalog=True, include_credentials=True 209 ... ) 210 >>> print(f"Created: {[f.name for f in result.created_files]}") 211 Created: ['catalog.yml', 'globals.yml', 'credentials.yml'] 212 """ 213 214 def __init__( 215 self, 216 config_path: str | Path, 217 config_loader: AbstractConfigLoader | None = None, 218 ) -> None: 219 """ 220 Initialize a Kedro data catalog from configuration files. 221 222 This constructor creates a new catalog instance by reading dataset 223 definitions from YAML configuration files in the specified directory. 224 It validates that the required directory structure exists and creates 225 the underlying Kedro DataCatalog. 226 227 Parameters 228 ---------- 229 config_path : str or pathlib.Path 230 Path to the configuration directory containing catalog definitions. 231 The directory must contain base/ and local/ subdirectories with 232 the required YAML files (catalog.yml at minimum). 233 config_loader : kedro.config.AbstractConfigLoader or None, default=None 234 Kedro configuration loader for reading YAML files. If None, an 235 OmegaConfigLoader will be created automatically with default settings 236 (base_env="base", default_run_env="local"). 237 238 Raises 239 ------ 240 FileNotFoundError 241 If the configuration directory does not exist. The error message 242 includes instructions for creating the directory structure using 243 the CLI command or the ``init_catalog`` class method. 244 FileNotFoundError 245 If the catalog structure is incomplete (missing base/ or local/ 246 directories or required catalog.yml files). The error message 247 includes instructions for creating the complete structure. 248 249 See Also 250 -------- 251 in_directory : Factory method for creating catalog instances. 252 init_catalog : Class method for scaffolding catalog directory structure. 253 254 Notes 255 ----- 256 The constructor performs the following steps: 257 1. Validates that the config_path directory exists 258 2. Checks for required catalog structure (base/ and local/ directories) 259 3. Creates or uses the provided config_loader 260 4. Loads catalog configuration and creates the Kedro DataCatalog 261 262 The catalog structure validation requires: 263 - base/ directory with catalog.yml 264 - local/ directory (can be empty initially) 265 266 Credentials are optional and should be placed in local/credentials.yml 267 to prevent accidental commits to version control. 268 269 Examples 270 -------- 271 Create a catalog with default configuration loader: 272 273 >>> catalog = KedroDataCatalog("config/catalog") 274 >>> catalog.config_path 275 'config/catalog' 276 277 Create a catalog with custom configuration loader: 278 279 >>> from kedro.config import OmegaConfigLoader 280 >>> loader = OmegaConfigLoader(conf_source="config", env="staging", base_env="base", default_run_env="local") 281 >>> catalog = KedroDataCatalog("config/catalog", config_loader=loader) 282 283 Handle missing configuration directory: 284 285 >>> try: 286 ... catalog = KedroDataCatalog("nonexistent/path") 287 ... except FileNotFoundError as e: 288 ... print("Directory not found. Run init_catalog to create it.") 289 Directory not found. Run init_catalog to create it. 290 """ 291 self.config_path = str(config_path) 292 path = Path(config_path) 293 294 if not path.exists(): 295 raise FileNotFoundError( 296 f"Configuration directory not found: {config_path}\n" 297 f"To create the catalog folder structure, run:\n" 298 f" adc-toolkit init-catalog {config_path}\n" 299 f"Or use the class method:\n" 300 f" KedroDataCatalog.init_catalog('{config_path}')" 301 ) 302 303 if not catalog_structure_exists(config_path, require_credentials=False): 304 raise FileNotFoundError( 305 f"Catalog structure is incomplete at: {path}\n" 306 f"Missing required files in base/ or local/.\n" 307 f"To create the catalog folder structure, run:\n" 308 f" adc-toolkit init-catalog {config_path}\n" 309 f"Or use the class method:\n" 310 f" KedroDataCatalog.init_catalog('{config_path}')" 311 ) 312 313 self.config_loader = config_loader 314 if not self.config_loader: 315 self.config_loader = create_omega_config_loader(self.config_path) 316 self._catalog = create_catalog(self.config_loader) 317 318 @classmethod 319 def in_directory(cls, path: str | Path) -> "KedroDataCatalog": 320 """ 321 Create a catalog instance from a configuration directory. 322 323 This factory method provides a convenient way to instantiate a 324 KedroDataCatalog by specifying only the configuration directory path. 325 It is the recommended way to create catalog instances in application code. 326 327 The method creates a catalog with default settings, using an automatically 328 configured OmegaConfigLoader to read YAML files from the directory. 329 330 Parameters 331 ---------- 332 path : str or pathlib.Path 333 Path to the configuration directory containing catalog definitions. 334 The directory must have the required Kedro catalog structure with 335 base/ and local/ subdirectories. 336 337 Returns 338 ------- 339 KedroDataCatalog 340 A new catalog instance configured with datasets from the directory. 341 The catalog is immediately ready to load and save data. 342 343 Raises 344 ------ 345 FileNotFoundError 346 If the specified directory does not exist or lacks the required 347 catalog structure (base/ and local/ directories with catalog.yml). 348 349 See Also 350 -------- 351 __init__ : Constructor with additional configuration options. 352 init_catalog : Create the catalog directory structure. 353 354 Notes 355 ----- 356 This factory method is equivalent to calling the constructor with just 357 the path parameter: 358 359 >>> catalog = KedroDataCatalog(path) 360 361 However, using ``in_directory`` is preferred because: 362 - It matches the DataCatalog protocol interface 363 - It provides better semantic clarity 364 - It enables polymorphism when using multiple catalog implementations 365 366 The method uses default configuration settings: 367 - Base environment: "base" 368 - Default run environment: "local" 369 - Configuration format: YAML 370 371 For advanced configuration needs (custom environments, config merging 372 strategies, runtime parameters), use the constructor directly with a 373 custom AbstractConfigLoader. 374 375 Examples 376 -------- 377 Basic usage: 378 379 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 380 >>> df = catalog.load("training_data") 381 382 Using pathlib.Path: 383 384 >>> from pathlib import Path 385 >>> config_dir = Path("config") / "catalog" 386 >>> catalog = KedroDataCatalog.in_directory(config_dir) 387 388 Load and save in a pipeline: 389 390 >>> catalog = KedroDataCatalog.in_directory("./config") 391 >>> raw = catalog.load("raw_data") 392 >>> processed = transform(raw) 393 >>> catalog.save("processed_data", processed) 394 395 Polymorphic usage with DataCatalog protocol: 396 397 >>> def run_pipeline(catalog: DataCatalog) -> None: 398 ... data = catalog.load("input") 399 ... result = process(data) 400 ... catalog.save("output", result) 401 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 402 >>> run_pipeline(catalog) 403 """ 404 return cls(path) 405 406 @classmethod 407 def init_catalog( 408 cls, 409 path: str | Path, 410 *, 411 overwrite: bool = False, 412 include_globals: bool = True, 413 include_catalog: bool = True, 414 include_credentials: bool = True, 415 ) -> ScaffoldResult: 416 """ 417 Create Kedro catalog directory structure with template configuration files. 418 419 This class method scaffolds a complete Kedro catalog configuration directory 420 with the required folder structure and template YAML files. It is intended 421 for initializing new projects or adding catalog functionality to existing 422 projects. 423 424 The method creates a directory structure following Kedro conventions: 425 426 - path/base/catalog.yml: Base dataset definitions 427 - path/base/globals.yml: Global variables and parameters 428 - path/local/catalog.yml: Local environment overrides 429 - path/local/credentials.yml: Credentials (should be gitignored) 430 431 Template files include helpful comments and examples to guide configuration. 432 433 Parameters 434 ---------- 435 path : str or pathlib.Path 436 Root path for the configuration directory. The directory will be 437 created if it doesn't exist. Subdirectories base/ and local/ will 438 be created within this path. 439 overwrite : bool, default=False 440 If True, overwrite existing files at the destination paths. If False, 441 existing files are preserved and reported in skipped_files. Use with 442 caution to avoid losing custom configurations. 443 include_globals : bool, default=True 444 If True, create base/globals.yml with template global variables. 445 Global variables can be referenced in catalog.yml using ${variable} 446 syntax for parameterization. 447 include_catalog : bool, default=True 448 If True, create base/catalog.yml and local/catalog.yml with template 449 dataset definitions. These files are essential for catalog operation. 450 include_credentials : bool, default=True 451 If True, create local/credentials.yml for storing credentials. This 452 file should be added to .gitignore to prevent committing secrets. 453 454 Returns 455 ------- 456 ScaffoldResult 457 A result object containing: 458 - created_files: List of Path objects for files that were created 459 - skipped_files: List of Path objects for files that already existed 460 - created_directories: List of Path objects for directories created 461 462 See Also 463 -------- 464 in_directory : Factory method to create catalog from existing config. 465 adc_toolkit.data.catalogs.kedro.scaffold.create_catalog_folder_structure : 466 Underlying scaffolding function. 467 468 Notes 469 ----- 470 The scaffolded directory structure follows Kedro best practices: 471 472 - **base/**: Contains base configurations shared across environments 473 - **local/**: Contains local overrides and credentials (gitignored) 474 475 Template catalog.yml includes examples for common dataset types: 476 - CSV files with pandas.CSVDataset 477 - Parquet files with pandas.ParquetDataset 478 - Excel files with pandas.ExcelDataset 479 - Pickle files with pickle.PickleDataset 480 481 Template globals.yml includes examples for: 482 - Base paths (data directories) 483 - Common parameters (date formats, separators) 484 - Environment-specific settings 485 486 After running this method, you should: 487 1. Review and customize the generated YAML files 488 2. Add local/credentials.yml to .gitignore 489 3. Define your project-specific datasets in base/catalog.yml 490 4. Add environment-specific overrides in local/catalog.yml 491 492 This is equivalent to running the CLI command: 493 ``adc-toolkit init-catalog <path>`` 494 495 Examples 496 -------- 497 Initialize a catalog in a new project: 498 499 >>> result = KedroDataCatalog.init_catalog("./config/catalog") 500 >>> print(f"Created {len(result.created_files)} files") 501 Created 4 files 502 >>> print(f"Directories: {[d.name for d in result.created_directories]}") 503 Directories: ['base', 'local'] 504 505 Initialize with selective templates: 506 507 >>> result = KedroDataCatalog.init_catalog( 508 ... "./config/catalog", 509 ... include_globals=True, 510 ... include_catalog=True, 511 ... include_credentials=False, # No credentials needed 512 ... ) 513 514 Reinitialize with overwrite (use carefully): 515 516 >>> result = KedroDataCatalog.init_catalog( 517 ... "./config/catalog", 518 ... overwrite=True, # Overwrites existing files 519 ... ) 520 521 Check what was created vs. skipped: 522 523 >>> result = KedroDataCatalog.init_catalog("./config/catalog") 524 >>> if result.skipped_files: 525 ... print(f"Skipped existing: {[f.name for f in result.skipped_files]}") 526 ... if result.created_files: 527 ... print(f"Created new: {[f.name for f in result.created_files]}") 528 529 Use in project setup script: 530 531 >>> from pathlib import Path 532 >>> project_root = Path("./my_project") 533 >>> catalog_dir = project_root / "config" / "catalog" 534 >>> result = KedroDataCatalog.init_catalog(catalog_dir) 535 >>> assert catalog_dir.exists() 536 >>> assert (catalog_dir / "base" / "catalog.yml").exists() 537 """ 538 return create_catalog_folder_structure( 539 path, 540 overwrite=overwrite, 541 include_globals=include_globals, 542 include_catalog=include_catalog, 543 include_credentials=include_credentials, 544 ) 545 546 def load(self, name: str, **query_args: Any) -> Data: 547 """ 548 Load a dataset by name from the catalog. 549 550 Retrieve a dataset using its registered name as defined in the catalog 551 configuration files. The method handles all I/O operations, file format 552 parsing, and type conversions based on the dataset's configuration. 553 554 For SQL-based datasets with parameterized queries, this method supports 555 dynamic query parameter substitution through keyword arguments, enabling 556 flexible data filtering and selection at load time. 557 558 Parameters 559 ---------- 560 name : str 561 The registered name of the dataset to load. This name must match 562 a dataset definition in the catalog configuration files (catalog.yml). 563 **query_args : Any 564 Keyword arguments for dynamic SQL query parameterization. Only 565 applicable to SQL-based datasets with parameterized queries using 566 Python format string syntax (e.g., WHERE year={year}). For non-SQL 567 datasets, query_args are ignored. 568 569 Returns 570 ------- 571 Data 572 The loaded dataset as a Data protocol-compatible object. The specific 573 type depends on the dataset configuration (typically pandas.DataFrame, 574 but can be Spark DataFrame, numpy array, or other types defined in 575 the catalog). 576 577 Raises 578 ------ 579 KeyError 580 If no dataset with the given name is registered in the catalog. 581 FileNotFoundError 582 If the dataset's source file or database does not exist. 583 ValueError 584 If the dataset cannot be loaded due to format errors, parsing errors, 585 or invalid query_args for datasets that don't support queries. 586 PermissionError 587 If the dataset's source file or database is not readable. 588 589 See Also 590 -------- 591 save : Save a dataset to the catalog. 592 _load_with_dynamic_query : Internal method for parameterized SQL queries. 593 594 Notes 595 ----- 596 Load behavior depends on the dataset type configured in catalog.yml: 597 598 - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5): 599 Reads from the configured filepath using the specified load_args. 600 601 - **Database datasets** (SQL, SQLQuery, SQLTable): Executes queries 602 or reads tables from the configured database connection. 603 604 - **Versioned datasets**: Loads the latest version unless a specific 605 version is requested in the configuration. 606 607 - **Partitioned datasets**: Loads and concatenates all partitions. 608 609 For SQL datasets with dynamic queries, the query string in catalog.yml 610 should use Python format string syntax with named placeholders: 611 612 .. code-block:: yaml 613 614 sales_data: 615 type: pandas.SQLQueryDataset 616 sql: SELECT * FROM sales WHERE year={year} AND region='{region}' 617 credentials: database_creds 618 619 Query parameters are substituted at load time: 620 621 >>> df = catalog.load("sales_data", year=2024, region="EMEA") 622 623 The load operation is idempotent: calling it multiple times with the 624 same parameters returns equivalent data (though not necessarily the 625 same object instance). 626 627 Performance Considerations 628 -------------------------- 629 - Large datasets may take significant time and memory to load 630 - For large files, consider using chunking or lazy loading 631 - Cloud storage (S3, GCS, Azure) may incur network latency 632 - Partitioned datasets are loaded in parallel when possible 633 634 Examples 635 -------- 636 Load a simple dataset: 637 638 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 639 >>> df = catalog.load("customer_data") 640 >>> df.shape 641 (10000, 8) 642 643 Load with dynamic SQL query parameters: 644 645 >>> # catalog.yml: sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'" 646 >>> sales_2024 = catalog.load("sales_data", year=2024, region="EMEA") 647 >>> sales_2024["year"].unique() 648 array([2024]) 649 >>> sales_2024["region"].unique() 650 array(['EMEA'], dtype=object) 651 652 Load multiple datasets in sequence: 653 654 >>> raw = catalog.load("raw_data") 655 >>> features = catalog.load("feature_set") 656 >>> model = catalog.load("trained_model") 657 658 Handle missing datasets gracefully: 659 660 >>> try: 661 ... data = catalog.load("nonexistent_dataset") 662 ... except KeyError as e: 663 ... print(f"Dataset not found: {e}") 664 ... # Use default data or prompt user 665 Dataset not found: 'nonexistent_dataset' 666 667 Load with different query parameters: 668 669 >>> q1_data = catalog.load("sales_data", quarter=1, year=2024) 670 >>> q2_data = catalog.load("sales_data", quarter=2, year=2024) 671 """ 672 if query_args: 673 return self._load_with_dynamic_query(name, **query_args) 674 return self._catalog.load(name) 675 676 def save(self, name: str, data: Data) -> None: 677 """ 678 Save a dataset by name to the catalog. 679 680 Store a dataset using its registered name as defined in the catalog 681 configuration files. The method handles all I/O operations, file format 682 serialization, and storage operations based on the dataset's configuration. 683 684 Parameters 685 ---------- 686 name : str 687 The registered name of the dataset to save. This name must match 688 a dataset definition in the catalog configuration files (catalog.yml). 689 The dataset configuration determines the output location, format, 690 and serialization parameters. 691 data : Data 692 The dataset to save. Must be a Data protocol-compatible object 693 (e.g., pandas.DataFrame, Spark DataFrame) that is compatible with 694 the dataset type specified in the catalog configuration. 695 696 Returns 697 ------- 698 None 699 700 Raises 701 ------ 702 KeyError 703 If no dataset with the given name is registered in the catalog. 704 TypeError 705 If the data type is incompatible with the dataset configuration 706 (e.g., attempting to save a DataFrame to a PickleDataset expecting 707 a different object type). 708 ValueError 709 If the dataset cannot be saved due to validation errors, format 710 incompatibilities, or invalid configuration. 711 PermissionError 712 If the target location is not writable due to filesystem permissions. 713 OSError 714 If disk space is insufficient or other I/O errors occur during save. 715 716 See Also 717 -------- 718 load : Load a dataset from the catalog. 719 720 Notes 721 ----- 722 Save behavior depends on the dataset type configured in catalog.yml: 723 724 - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5): 725 Writes to the configured filepath using the specified save_args. 726 Parent directories are created automatically if they don't exist. 727 728 - **Database datasets** (SQL, SQLTable): Writes to the configured 729 database table using the specified connection and save parameters. 730 731 - **Versioned datasets**: Creates a new timestamped version rather 732 than overwriting. Version format: filepath/YYYY-MM-DDTHH.mm.ss.sssZ/ 733 734 - **Partitioned datasets**: Splits data across multiple files based 735 on partitioning configuration. 736 737 The save operation typically overwrites existing data at the target 738 location unless versioning is enabled. For versioned datasets, each 739 save creates a new version without removing previous versions. 740 741 Format-Specific Behavior 742 ------------------------ 743 Different formats have different save characteristics: 744 745 - **CSV**: Human-readable, widely compatible, larger file size 746 - **Parquet**: Columnar format, compressed, efficient for analytics 747 - **Pickle**: Python-specific, preserves exact objects, version-sensitive 748 - **JSON**: Human-readable, good for nested structures 749 - **HDF5**: Binary format, good for large numerical arrays 750 751 Atomicity and Error Handling 752 ----------------------------- 753 The atomicity of save operations depends on the underlying dataset type 754 and storage backend: 755 756 - Local filesystem writes may be atomic for small files 757 - Cloud storage (S3, GCS, Azure) uses multi-part uploads 758 - Database writes depend on transaction support 759 - Partitioned saves may be partially successful 760 761 If a save operation fails partway through, partial data may be written. 762 For critical applications, consider implementing save-to-temporary-then-move 763 patterns or using versioned datasets. 764 765 Performance Considerations 766 -------------------------- 767 - Large datasets may take significant time to serialize and write 768 - Cloud storage uploads may have network latency and bandwidth limits 769 - Compression (enabled in save_args) trades CPU time for disk space 770 - Partitioned datasets can write partitions in parallel 771 772 Examples 773 -------- 774 Save a processed dataset: 775 776 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 777 >>> processed_df = process_data(raw_df) 778 >>> catalog.save("processed_data", processed_df) 779 780 Save multiple datasets in a pipeline: 781 782 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 783 >>> raw = catalog.load("raw_data") 784 >>> cleaned = clean_data(raw) 785 >>> catalog.save("cleaned_data", cleaned) 786 >>> features = engineer_features(cleaned) 787 >>> catalog.save("features", features) 788 >>> predictions = model.predict(features) 789 >>> catalog.save("predictions", predictions) 790 791 Save with versioning (configured in catalog.yml): 792 793 >>> # catalog.yml: 794 >>> # versioned_output: 795 >>> # type: pandas.CSVDataset 796 >>> # filepath: data/output.csv 797 >>> # versioned: true 798 >>> catalog.save("versioned_output", result_df) 799 >>> # Saves to: data/output.csv/2024-01-15T10.30.45.123Z/output.csv 800 801 Handle save errors: 802 803 >>> try: 804 ... catalog.save("output_data", large_df) 805 ... except PermissionError as e: 806 ... print(f"Cannot write to output location: {e}") 807 ... except OSError as e: 808 ... print(f"I/O error during save: {e}") 809 810 Save to different formats (configured per dataset): 811 812 >>> # Same data, different formats for different use cases 813 >>> catalog.save("output_csv", df) # Human-readable 814 >>> catalog.save("output_parquet", df) # Efficient storage 815 >>> catalog.save("output_json", df) # API consumption 816 """ 817 self._catalog.save(name, data) 818 819 def _load_with_dynamic_query(self, name: str, **query_args: Any) -> Data: 820 """ 821 Load data from the catalog with dynamic SQL query parameterization. 822 823 This internal method implements dynamic query parameter substitution for 824 SQL-based datasets. It temporarily modifies the dataset's query string 825 by substituting format placeholders with provided arguments, executes 826 the load operation, then restores the original query template. 827 828 Parameters 829 ---------- 830 name : str 831 The registered name of the SQL dataset to load. The dataset must 832 be configured with a parameterized query using Python format string 833 syntax (e.g., SELECT * FROM table WHERE id={id}). 834 **query_args : Any 835 Keyword arguments providing values for query parameters. Keys must 836 match the placeholder names in the query template. Values are 837 substituted using Python's str.format() method. 838 839 Returns 840 ------- 841 Data 842 The loaded dataset with the parameterized query applied. The return 843 type depends on the dataset configuration (typically pandas.DataFrame 844 for SQL datasets). 845 846 Raises 847 ------ 848 ValueError 849 If the dataset does not support queries (i.e., the dataset 850 configuration does not include a 'query' parameter in load_args), 851 or if query parameter substitution fails due to format string errors. 852 KeyError 853 If query_args are missing required parameters referenced in the 854 query template, or if the dataset name is not registered. 855 856 See Also 857 -------- 858 load : Public method that delegates to this internal method when query_args provided. 859 860 Notes 861 ----- 862 This method directly manipulates the internal Kedro DataCatalog's dataset 863 configuration by: 864 1. Accessing the dataset's _load_args dictionary 865 2. Extracting the query template string 866 3. Substituting placeholders with provided arguments using str.format() 867 4. Loading data with the substituted query 868 5. Restoring the original query template for subsequent loads 869 870 The query restoration ensures that the catalog remains stateless and 871 multiple calls with different parameters don't interfere with each other. 872 873 Query Parameterization Format 874 ------------------------------ 875 The query template in catalog.yml should use Python format string syntax: 876 877 - Named placeholders: {parameter_name} 878 - String parameters need explicit quotes: WHERE name='{name}' 879 - Numeric parameters don't need quotes: WHERE id={id} 880 881 Example catalog.yml configuration: 882 883 .. code-block:: yaml 884 885 parameterized_sales: 886 type: pandas.SQLQueryDataset 887 sql: | 888 SELECT * FROM sales 889 WHERE year={year} 890 AND region='{region}' 891 AND revenue > {min_revenue} 892 credentials: db_credentials 893 894 Security Considerations 895 ----------------------- 896 This method uses Python's str.format() for parameter substitution, which 897 does NOT provide SQL injection protection. Use this method only with: 898 899 - Trusted parameter values from application code 900 - Validated and sanitized user inputs 901 - Internal pipeline parameters 902 903 For user-provided inputs, consider using database-specific parameter 904 binding mechanisms instead of string formatting. 905 906 Examples 907 -------- 908 Internal usage by the load method: 909 910 >>> # User calls load with query parameters 911 >>> df = catalog.load("sales_data", year=2024, region="EMEA") 912 >>> # Internally delegates to _load_with_dynamic_query 913 914 Query template in catalog.yml: 915 916 .. code-block:: yaml 917 918 sales_data: 919 type: pandas.SQLQueryDataset 920 sql: SELECT * FROM sales WHERE year={year} AND region='{region}' 921 credentials: database_creds 922 923 Equivalent direct call (not recommended for users): 924 925 >>> data = catalog._load_with_dynamic_query("sales_data", year=2024, region="EMEA") 926 927 Error when dataset doesn't support queries: 928 929 >>> try: 930 ... catalog._load_with_dynamic_query("csv_dataset", param=123) 931 ... except ValueError as e: 932 ... print(e) 933 Data set `csv_dataset` does not support queries. 934 """ 935 load_args = self._catalog._datasets[name]._load_args 936 if "query" not in load_args: 937 raise ValueError(f"Data set `{name}` does not support queries.") 938 939 raw_query = load_args["query"] 940 load_args["query"] = raw_query.format(**query_args) 941 data = self._catalog.load(name) 942 load_args["query"] = raw_query 943 944 return data
Kedro-based implementation of the DataCatalog protocol.
This class provides a production-ready data catalog using Kedro's DataCatalog as the underlying I/O engine. It supports configuration-driven dataset management through YAML files, enabling declarative definitions of data sources, file formats, load/save parameters, and storage backends.
The catalog handles diverse data formats (CSV, Parquet, JSON, Excel, Pickle, HDF5), storage locations (local filesystem, S3, GCS, Azure Blob), and advanced features like versioning, partitioning, and dynamic SQL queries.
Parameters
- config_path (str or pathlib.Path): Path to the configuration directory containing catalog YAML files. The directory should contain base/ and local/ subdirectories with catalog.yml, globals.yml, and optionally credentials.yml files.
- config_loader (kedro.config.AbstractConfigLoader or None, default=None): Kedro configuration loader instance. If None, an OmegaConfigLoader will be created automatically to load YAML configurations from the config_path directory.
Attributes
- config_path (str): Path to the configuration directory as a string.
- config_loader (kedro.config.AbstractConfigLoader): The configuration loader instance used to read YAML files.
- _catalog (kedro.io.DataCatalog): Internal Kedro DataCatalog instance handling actual I/O operations.
Methods
load(name, **query_args) Load a dataset by name from the catalog. save(name, data) Save a dataset by name to the catalog. in_directory(path) Factory method to create a catalog from a configuration directory. init_catalog(path, overwrite=False, include_globals=True, ...) Create the Kedro catalog folder structure with template files.
Raises
- FileNotFoundError: If the configuration directory does not exist or the catalog structure is incomplete (missing required base/ or local/ directories).
See Also
adc_toolkit.data.abs.DataCatalog: Protocol definition for data catalogs.
adc_toolkit.data.validated_catalog.ValidatedDataCatalog: Catalog with validation.
kedro.io.DataCatalog: Underlying Kedro catalog implementation.
kedro.config.OmegaConfigLoader: Default configuration loader.
Notes
The catalog expects a specific directory structure:
config_path/ base/ catalog.yml # Base dataset definitions globals.yml # Global variables and parameters local/ catalog.yml # Local overrides (gitignored) credentials.yml # Credentials (gitignored)
Dataset definitions in catalog.yml follow Kedro's format:
dataset_name:
type: pandas.CSVDataset
filepath: data/raw/dataset.csv
load_args:
sep: ","
save_args:
index: False
The catalog supports versioning, which automatically timestamps saved datasets and allows loading specific versions. Partitioning enables splitting large datasets into multiple files for parallel processing.
For SQL datasets, the catalog supports dynamic query parameters that can be provided at load time using the query_args keyword arguments.
Thread Safety
The KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe for concurrent writes to the same dataset. Concurrent reads are safe.
References
Examples
Create a catalog using the factory method:
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("customer_data")
>>> df.columns
Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
Save processed data:
>>> processed_df = process_customers(df)
>>> catalog.save("processed_customers", processed_df)
Load data with dynamic SQL query parameters:
>>> # catalog.yml defines: SELECT * FROM sales WHERE year={year} AND region='{region}'
>>> sales = catalog.load("sales_data", year=2024, region="EMEA")
>>> sales.shape
(15420, 8)
Create a catalog with custom config loader:
>>> from kedro.config import OmegaConfigLoader
>>> loader = OmegaConfigLoader(conf_source="config", env="production", base_env="base", default_run_env="local")
>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
Initialize a new catalog structure:
>>> result = KedroDataCatalog.init_catalog(
... "./my_project/config", include_globals=True, include_catalog=True, include_credentials=True
... )
>>> print(f"Created: {[f.name for f in result.created_files]}")
Created: ['catalog.yml', 'globals.yml', 'credentials.yml']
214 def __init__( 215 self, 216 config_path: str | Path, 217 config_loader: AbstractConfigLoader | None = None, 218 ) -> None: 219 """ 220 Initialize a Kedro data catalog from configuration files. 221 222 This constructor creates a new catalog instance by reading dataset 223 definitions from YAML configuration files in the specified directory. 224 It validates that the required directory structure exists and creates 225 the underlying Kedro DataCatalog. 226 227 Parameters 228 ---------- 229 config_path : str or pathlib.Path 230 Path to the configuration directory containing catalog definitions. 231 The directory must contain base/ and local/ subdirectories with 232 the required YAML files (catalog.yml at minimum). 233 config_loader : kedro.config.AbstractConfigLoader or None, default=None 234 Kedro configuration loader for reading YAML files. If None, an 235 OmegaConfigLoader will be created automatically with default settings 236 (base_env="base", default_run_env="local"). 237 238 Raises 239 ------ 240 FileNotFoundError 241 If the configuration directory does not exist. The error message 242 includes instructions for creating the directory structure using 243 the CLI command or the ``init_catalog`` class method. 244 FileNotFoundError 245 If the catalog structure is incomplete (missing base/ or local/ 246 directories or required catalog.yml files). The error message 247 includes instructions for creating the complete structure. 248 249 See Also 250 -------- 251 in_directory : Factory method for creating catalog instances. 252 init_catalog : Class method for scaffolding catalog directory structure. 253 254 Notes 255 ----- 256 The constructor performs the following steps: 257 1. Validates that the config_path directory exists 258 2. Checks for required catalog structure (base/ and local/ directories) 259 3. Creates or uses the provided config_loader 260 4. Loads catalog configuration and creates the Kedro DataCatalog 261 262 The catalog structure validation requires: 263 - base/ directory with catalog.yml 264 - local/ directory (can be empty initially) 265 266 Credentials are optional and should be placed in local/credentials.yml 267 to prevent accidental commits to version control. 268 269 Examples 270 -------- 271 Create a catalog with default configuration loader: 272 273 >>> catalog = KedroDataCatalog("config/catalog") 274 >>> catalog.config_path 275 'config/catalog' 276 277 Create a catalog with custom configuration loader: 278 279 >>> from kedro.config import OmegaConfigLoader 280 >>> loader = OmegaConfigLoader(conf_source="config", env="staging", base_env="base", default_run_env="local") 281 >>> catalog = KedroDataCatalog("config/catalog", config_loader=loader) 282 283 Handle missing configuration directory: 284 285 >>> try: 286 ... catalog = KedroDataCatalog("nonexistent/path") 287 ... except FileNotFoundError as e: 288 ... print("Directory not found. Run init_catalog to create it.") 289 Directory not found. Run init_catalog to create it. 290 """ 291 self.config_path = str(config_path) 292 path = Path(config_path) 293 294 if not path.exists(): 295 raise FileNotFoundError( 296 f"Configuration directory not found: {config_path}\n" 297 f"To create the catalog folder structure, run:\n" 298 f" adc-toolkit init-catalog {config_path}\n" 299 f"Or use the class method:\n" 300 f" KedroDataCatalog.init_catalog('{config_path}')" 301 ) 302 303 if not catalog_structure_exists(config_path, require_credentials=False): 304 raise FileNotFoundError( 305 f"Catalog structure is incomplete at: {path}\n" 306 f"Missing required files in base/ or local/.\n" 307 f"To create the catalog folder structure, run:\n" 308 f" adc-toolkit init-catalog {config_path}\n" 309 f"Or use the class method:\n" 310 f" KedroDataCatalog.init_catalog('{config_path}')" 311 ) 312 313 self.config_loader = config_loader 314 if not self.config_loader: 315 self.config_loader = create_omega_config_loader(self.config_path) 316 self._catalog = create_catalog(self.config_loader)
Initialize a Kedro data catalog from configuration files.
This constructor creates a new catalog instance by reading dataset definitions from YAML configuration files in the specified directory. It validates that the required directory structure exists and creates the underlying Kedro DataCatalog.
Parameters
- config_path (str or pathlib.Path): Path to the configuration directory containing catalog definitions. The directory must contain base/ and local/ subdirectories with the required YAML files (catalog.yml at minimum).
- config_loader (kedro.config.AbstractConfigLoader or None, default=None): Kedro configuration loader for reading YAML files. If None, an OmegaConfigLoader will be created automatically with default settings (base_env="base", default_run_env="local").
Raises
- FileNotFoundError: If the configuration directory does not exist. The error message
includes instructions for creating the directory structure using
the CLI command or the
init_catalogclass method. - FileNotFoundError: If the catalog structure is incomplete (missing base/ or local/ directories or required catalog.yml files). The error message includes instructions for creating the complete structure.
See Also
in_directory: Factory method for creating catalog instances.
init_catalog: Class method for scaffolding catalog directory structure.
Notes
The constructor performs the following steps:
- Validates that the config_path directory exists
- Checks for required catalog structure (base/ and local/ directories)
- Creates or uses the provided config_loader
- Loads catalog configuration and creates the Kedro DataCatalog
The catalog structure validation requires:
- base/ directory with catalog.yml
- local/ directory (can be empty initially)
Credentials are optional and should be placed in local/credentials.yml to prevent accidental commits to version control.
Examples
Create a catalog with default configuration loader:
>>> catalog = KedroDataCatalog("config/catalog")
>>> catalog.config_path
'config/catalog'
Create a catalog with custom configuration loader:
>>> from kedro.config import OmegaConfigLoader
>>> loader = OmegaConfigLoader(conf_source="config", env="staging", base_env="base", default_run_env="local")
>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
Handle missing configuration directory:
>>> try:
... catalog = KedroDataCatalog("nonexistent/path")
... except FileNotFoundError as e:
... print("Directory not found. Run init_catalog to create it.")
Directory not found. Run init_catalog to create it.
318 @classmethod 319 def in_directory(cls, path: str | Path) -> "KedroDataCatalog": 320 """ 321 Create a catalog instance from a configuration directory. 322 323 This factory method provides a convenient way to instantiate a 324 KedroDataCatalog by specifying only the configuration directory path. 325 It is the recommended way to create catalog instances in application code. 326 327 The method creates a catalog with default settings, using an automatically 328 configured OmegaConfigLoader to read YAML files from the directory. 329 330 Parameters 331 ---------- 332 path : str or pathlib.Path 333 Path to the configuration directory containing catalog definitions. 334 The directory must have the required Kedro catalog structure with 335 base/ and local/ subdirectories. 336 337 Returns 338 ------- 339 KedroDataCatalog 340 A new catalog instance configured with datasets from the directory. 341 The catalog is immediately ready to load and save data. 342 343 Raises 344 ------ 345 FileNotFoundError 346 If the specified directory does not exist or lacks the required 347 catalog structure (base/ and local/ directories with catalog.yml). 348 349 See Also 350 -------- 351 __init__ : Constructor with additional configuration options. 352 init_catalog : Create the catalog directory structure. 353 354 Notes 355 ----- 356 This factory method is equivalent to calling the constructor with just 357 the path parameter: 358 359 >>> catalog = KedroDataCatalog(path) 360 361 However, using ``in_directory`` is preferred because: 362 - It matches the DataCatalog protocol interface 363 - It provides better semantic clarity 364 - It enables polymorphism when using multiple catalog implementations 365 366 The method uses default configuration settings: 367 - Base environment: "base" 368 - Default run environment: "local" 369 - Configuration format: YAML 370 371 For advanced configuration needs (custom environments, config merging 372 strategies, runtime parameters), use the constructor directly with a 373 custom AbstractConfigLoader. 374 375 Examples 376 -------- 377 Basic usage: 378 379 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 380 >>> df = catalog.load("training_data") 381 382 Using pathlib.Path: 383 384 >>> from pathlib import Path 385 >>> config_dir = Path("config") / "catalog" 386 >>> catalog = KedroDataCatalog.in_directory(config_dir) 387 388 Load and save in a pipeline: 389 390 >>> catalog = KedroDataCatalog.in_directory("./config") 391 >>> raw = catalog.load("raw_data") 392 >>> processed = transform(raw) 393 >>> catalog.save("processed_data", processed) 394 395 Polymorphic usage with DataCatalog protocol: 396 397 >>> def run_pipeline(catalog: DataCatalog) -> None: 398 ... data = catalog.load("input") 399 ... result = process(data) 400 ... catalog.save("output", result) 401 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 402 >>> run_pipeline(catalog) 403 """ 404 return cls(path)
Create a catalog instance from a configuration directory.
This factory method provides a convenient way to instantiate a KedroDataCatalog by specifying only the configuration directory path. It is the recommended way to create catalog instances in application code.
The method creates a catalog with default settings, using an automatically configured OmegaConfigLoader to read YAML files from the directory.
Parameters
- path (str or pathlib.Path): Path to the configuration directory containing catalog definitions. The directory must have the required Kedro catalog structure with base/ and local/ subdirectories.
Returns
- KedroDataCatalog: A new catalog instance configured with datasets from the directory. The catalog is immediately ready to load and save data.
Raises
- FileNotFoundError: If the specified directory does not exist or lacks the required catalog structure (base/ and local/ directories with catalog.yml).
See Also
__init__: Constructor with additional configuration options.
init_catalog: Create the catalog directory structure.
Notes
This factory method is equivalent to calling the constructor with just the path parameter:
>>> catalog = KedroDataCatalog(path)
However, using in_directory is preferred because:
- It matches the DataCatalog protocol interface
- It provides better semantic clarity
- It enables polymorphism when using multiple catalog implementations
The method uses default configuration settings:
- Base environment: "base"
- Default run environment: "local"
- Configuration format: YAML
For advanced configuration needs (custom environments, config merging strategies, runtime parameters), use the constructor directly with a custom AbstractConfigLoader.
Examples
Basic usage:
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("training_data")
Using pathlib.Path:
>>> from pathlib import Path
>>> config_dir = Path("config") / "catalog"
>>> catalog = KedroDataCatalog.in_directory(config_dir)
Load and save in a pipeline:
>>> catalog = KedroDataCatalog.in_directory("./config")
>>> raw = catalog.load("raw_data")
>>> processed = transform(raw)
>>> catalog.save("processed_data", processed)
Polymorphic usage with DataCatalog protocol:
>>> def run_pipeline(catalog: DataCatalog) -> None:
... data = catalog.load("input")
... result = process(data)
... catalog.save("output", result)
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> run_pipeline(catalog)
406 @classmethod 407 def init_catalog( 408 cls, 409 path: str | Path, 410 *, 411 overwrite: bool = False, 412 include_globals: bool = True, 413 include_catalog: bool = True, 414 include_credentials: bool = True, 415 ) -> ScaffoldResult: 416 """ 417 Create Kedro catalog directory structure with template configuration files. 418 419 This class method scaffolds a complete Kedro catalog configuration directory 420 with the required folder structure and template YAML files. It is intended 421 for initializing new projects or adding catalog functionality to existing 422 projects. 423 424 The method creates a directory structure following Kedro conventions: 425 426 - path/base/catalog.yml: Base dataset definitions 427 - path/base/globals.yml: Global variables and parameters 428 - path/local/catalog.yml: Local environment overrides 429 - path/local/credentials.yml: Credentials (should be gitignored) 430 431 Template files include helpful comments and examples to guide configuration. 432 433 Parameters 434 ---------- 435 path : str or pathlib.Path 436 Root path for the configuration directory. The directory will be 437 created if it doesn't exist. Subdirectories base/ and local/ will 438 be created within this path. 439 overwrite : bool, default=False 440 If True, overwrite existing files at the destination paths. If False, 441 existing files are preserved and reported in skipped_files. Use with 442 caution to avoid losing custom configurations. 443 include_globals : bool, default=True 444 If True, create base/globals.yml with template global variables. 445 Global variables can be referenced in catalog.yml using ${variable} 446 syntax for parameterization. 447 include_catalog : bool, default=True 448 If True, create base/catalog.yml and local/catalog.yml with template 449 dataset definitions. These files are essential for catalog operation. 450 include_credentials : bool, default=True 451 If True, create local/credentials.yml for storing credentials. This 452 file should be added to .gitignore to prevent committing secrets. 453 454 Returns 455 ------- 456 ScaffoldResult 457 A result object containing: 458 - created_files: List of Path objects for files that were created 459 - skipped_files: List of Path objects for files that already existed 460 - created_directories: List of Path objects for directories created 461 462 See Also 463 -------- 464 in_directory : Factory method to create catalog from existing config. 465 adc_toolkit.data.catalogs.kedro.scaffold.create_catalog_folder_structure : 466 Underlying scaffolding function. 467 468 Notes 469 ----- 470 The scaffolded directory structure follows Kedro best practices: 471 472 - **base/**: Contains base configurations shared across environments 473 - **local/**: Contains local overrides and credentials (gitignored) 474 475 Template catalog.yml includes examples for common dataset types: 476 - CSV files with pandas.CSVDataset 477 - Parquet files with pandas.ParquetDataset 478 - Excel files with pandas.ExcelDataset 479 - Pickle files with pickle.PickleDataset 480 481 Template globals.yml includes examples for: 482 - Base paths (data directories) 483 - Common parameters (date formats, separators) 484 - Environment-specific settings 485 486 After running this method, you should: 487 1. Review and customize the generated YAML files 488 2. Add local/credentials.yml to .gitignore 489 3. Define your project-specific datasets in base/catalog.yml 490 4. Add environment-specific overrides in local/catalog.yml 491 492 This is equivalent to running the CLI command: 493 ``adc-toolkit init-catalog <path>`` 494 495 Examples 496 -------- 497 Initialize a catalog in a new project: 498 499 >>> result = KedroDataCatalog.init_catalog("./config/catalog") 500 >>> print(f"Created {len(result.created_files)} files") 501 Created 4 files 502 >>> print(f"Directories: {[d.name for d in result.created_directories]}") 503 Directories: ['base', 'local'] 504 505 Initialize with selective templates: 506 507 >>> result = KedroDataCatalog.init_catalog( 508 ... "./config/catalog", 509 ... include_globals=True, 510 ... include_catalog=True, 511 ... include_credentials=False, # No credentials needed 512 ... ) 513 514 Reinitialize with overwrite (use carefully): 515 516 >>> result = KedroDataCatalog.init_catalog( 517 ... "./config/catalog", 518 ... overwrite=True, # Overwrites existing files 519 ... ) 520 521 Check what was created vs. skipped: 522 523 >>> result = KedroDataCatalog.init_catalog("./config/catalog") 524 >>> if result.skipped_files: 525 ... print(f"Skipped existing: {[f.name for f in result.skipped_files]}") 526 ... if result.created_files: 527 ... print(f"Created new: {[f.name for f in result.created_files]}") 528 529 Use in project setup script: 530 531 >>> from pathlib import Path 532 >>> project_root = Path("./my_project") 533 >>> catalog_dir = project_root / "config" / "catalog" 534 >>> result = KedroDataCatalog.init_catalog(catalog_dir) 535 >>> assert catalog_dir.exists() 536 >>> assert (catalog_dir / "base" / "catalog.yml").exists() 537 """ 538 return create_catalog_folder_structure( 539 path, 540 overwrite=overwrite, 541 include_globals=include_globals, 542 include_catalog=include_catalog, 543 include_credentials=include_credentials, 544 )
Create Kedro catalog directory structure with template configuration files.
This class method scaffolds a complete Kedro catalog configuration directory with the required folder structure and template YAML files. It is intended for initializing new projects or adding catalog functionality to existing projects.
The method creates a directory structure following Kedro conventions:
- path/base/catalog.yml: Base dataset definitions
- path/base/globals.yml: Global variables and parameters
- path/local/catalog.yml: Local environment overrides
- path/local/credentials.yml: Credentials (should be gitignored)
Template files include helpful comments and examples to guide configuration.
Parameters
- path (str or pathlib.Path): Root path for the configuration directory. The directory will be created if it doesn't exist. Subdirectories base/ and local/ will be created within this path.
- overwrite (bool, default=False): If True, overwrite existing files at the destination paths. If False, existing files are preserved and reported in skipped_files. Use with caution to avoid losing custom configurations.
- include_globals (bool, default=True): If True, create base/globals.yml with template global variables. Global variables can be referenced in catalog.yml using ${variable} syntax for parameterization.
- include_catalog (bool, default=True): If True, create base/catalog.yml and local/catalog.yml with template dataset definitions. These files are essential for catalog operation.
- include_credentials (bool, default=True): If True, create local/credentials.yml for storing credentials. This file should be added to .gitignore to prevent committing secrets.
Returns
- ScaffoldResult: A result object containing:
- created_files: List of Path objects for files that were created
- skipped_files: List of Path objects for files that already existed
- created_directories: List of Path objects for directories created
See Also
in_directory: Factory method to create catalog from existing config.
adc_toolkit.data.catalogs.kedro.scaffold.create_catalog_folder_structure:
Underlying scaffolding function.
Notes
The scaffolded directory structure follows Kedro best practices:
- base/: Contains base configurations shared across environments
- local/: Contains local overrides and credentials (gitignored)
Template catalog.yml includes examples for common dataset types:
- CSV files with pandas.CSVDataset
- Parquet files with pandas.ParquetDataset
- Excel files with pandas.ExcelDataset
- Pickle files with pickle.PickleDataset
Template globals.yml includes examples for:
- Base paths (data directories)
- Common parameters (date formats, separators)
- Environment-specific settings
After running this method, you should:
- Review and customize the generated YAML files
- Add local/credentials.yml to .gitignore
- Define your project-specific datasets in base/catalog.yml
- Add environment-specific overrides in local/catalog.yml
This is equivalent to running the CLI command:
adc-toolkit init-catalog <path>
Examples
Initialize a catalog in a new project:
>>> result = KedroDataCatalog.init_catalog("./config/catalog")
>>> print(f"Created {len(result.created_files)} files")
Created 4 files
>>> print(f"Directories: {[d.name for d in result.created_directories]}")
Directories: ['base', 'local']
Initialize with selective templates:
>>> result = KedroDataCatalog.init_catalog(
... "./config/catalog",
... include_globals=True,
... include_catalog=True,
... include_credentials=False, # No credentials needed
... )
Reinitialize with overwrite (use carefully):
>>> result = KedroDataCatalog.init_catalog(
... "./config/catalog",
... overwrite=True, # Overwrites existing files
... )
Check what was created vs. skipped:
>>> result = KedroDataCatalog.init_catalog("./config/catalog")
>>> if result.skipped_files:
... print(f"Skipped existing: {[f.name for f in result.skipped_files]}")
... if result.created_files:
... print(f"Created new: {[f.name for f in result.created_files]}")
Use in project setup script:
>>> from pathlib import Path
>>> project_root = Path("./my_project")
>>> catalog_dir = project_root / "config" / "catalog"
>>> result = KedroDataCatalog.init_catalog(catalog_dir)
>>> assert catalog_dir.exists()
>>> assert (catalog_dir / "base" / "catalog.yml").exists()
546 def load(self, name: str, **query_args: Any) -> Data: 547 """ 548 Load a dataset by name from the catalog. 549 550 Retrieve a dataset using its registered name as defined in the catalog 551 configuration files. The method handles all I/O operations, file format 552 parsing, and type conversions based on the dataset's configuration. 553 554 For SQL-based datasets with parameterized queries, this method supports 555 dynamic query parameter substitution through keyword arguments, enabling 556 flexible data filtering and selection at load time. 557 558 Parameters 559 ---------- 560 name : str 561 The registered name of the dataset to load. This name must match 562 a dataset definition in the catalog configuration files (catalog.yml). 563 **query_args : Any 564 Keyword arguments for dynamic SQL query parameterization. Only 565 applicable to SQL-based datasets with parameterized queries using 566 Python format string syntax (e.g., WHERE year={year}). For non-SQL 567 datasets, query_args are ignored. 568 569 Returns 570 ------- 571 Data 572 The loaded dataset as a Data protocol-compatible object. The specific 573 type depends on the dataset configuration (typically pandas.DataFrame, 574 but can be Spark DataFrame, numpy array, or other types defined in 575 the catalog). 576 577 Raises 578 ------ 579 KeyError 580 If no dataset with the given name is registered in the catalog. 581 FileNotFoundError 582 If the dataset's source file or database does not exist. 583 ValueError 584 If the dataset cannot be loaded due to format errors, parsing errors, 585 or invalid query_args for datasets that don't support queries. 586 PermissionError 587 If the dataset's source file or database is not readable. 588 589 See Also 590 -------- 591 save : Save a dataset to the catalog. 592 _load_with_dynamic_query : Internal method for parameterized SQL queries. 593 594 Notes 595 ----- 596 Load behavior depends on the dataset type configured in catalog.yml: 597 598 - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5): 599 Reads from the configured filepath using the specified load_args. 600 601 - **Database datasets** (SQL, SQLQuery, SQLTable): Executes queries 602 or reads tables from the configured database connection. 603 604 - **Versioned datasets**: Loads the latest version unless a specific 605 version is requested in the configuration. 606 607 - **Partitioned datasets**: Loads and concatenates all partitions. 608 609 For SQL datasets with dynamic queries, the query string in catalog.yml 610 should use Python format string syntax with named placeholders: 611 612 .. code-block:: yaml 613 614 sales_data: 615 type: pandas.SQLQueryDataset 616 sql: SELECT * FROM sales WHERE year={year} AND region='{region}' 617 credentials: database_creds 618 619 Query parameters are substituted at load time: 620 621 >>> df = catalog.load("sales_data", year=2024, region="EMEA") 622 623 The load operation is idempotent: calling it multiple times with the 624 same parameters returns equivalent data (though not necessarily the 625 same object instance). 626 627 Performance Considerations 628 -------------------------- 629 - Large datasets may take significant time and memory to load 630 - For large files, consider using chunking or lazy loading 631 - Cloud storage (S3, GCS, Azure) may incur network latency 632 - Partitioned datasets are loaded in parallel when possible 633 634 Examples 635 -------- 636 Load a simple dataset: 637 638 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 639 >>> df = catalog.load("customer_data") 640 >>> df.shape 641 (10000, 8) 642 643 Load with dynamic SQL query parameters: 644 645 >>> # catalog.yml: sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'" 646 >>> sales_2024 = catalog.load("sales_data", year=2024, region="EMEA") 647 >>> sales_2024["year"].unique() 648 array([2024]) 649 >>> sales_2024["region"].unique() 650 array(['EMEA'], dtype=object) 651 652 Load multiple datasets in sequence: 653 654 >>> raw = catalog.load("raw_data") 655 >>> features = catalog.load("feature_set") 656 >>> model = catalog.load("trained_model") 657 658 Handle missing datasets gracefully: 659 660 >>> try: 661 ... data = catalog.load("nonexistent_dataset") 662 ... except KeyError as e: 663 ... print(f"Dataset not found: {e}") 664 ... # Use default data or prompt user 665 Dataset not found: 'nonexistent_dataset' 666 667 Load with different query parameters: 668 669 >>> q1_data = catalog.load("sales_data", quarter=1, year=2024) 670 >>> q2_data = catalog.load("sales_data", quarter=2, year=2024) 671 """ 672 if query_args: 673 return self._load_with_dynamic_query(name, **query_args) 674 return self._catalog.load(name)
Load a dataset by name from the catalog.
Retrieve a dataset using its registered name as defined in the catalog configuration files. The method handles all I/O operations, file format parsing, and type conversions based on the dataset's configuration.
For SQL-based datasets with parameterized queries, this method supports dynamic query parameter substitution through keyword arguments, enabling flexible data filtering and selection at load time.
Parameters
- name (str): The registered name of the dataset to load. This name must match a dataset definition in the catalog configuration files (catalog.yml).
- **query_args (Any): Keyword arguments for dynamic SQL query parameterization. Only applicable to SQL-based datasets with parameterized queries using Python format string syntax (e.g., WHERE year={year}). For non-SQL datasets, query_args are ignored.
Returns
- Data: The loaded dataset as a Data protocol-compatible object. The specific type depends on the dataset configuration (typically pandas.DataFrame, but can be Spark DataFrame, numpy array, or other types defined in the catalog).
Raises
- KeyError: If no dataset with the given name is registered in the catalog.
- FileNotFoundError: If the dataset's source file or database does not exist.
- ValueError: If the dataset cannot be loaded due to format errors, parsing errors, or invalid query_args for datasets that don't support queries.
- PermissionError: If the dataset's source file or database is not readable.
See Also
save: Save a dataset to the catalog.
_load_with_dynamic_query: Internal method for parameterized SQL queries.
Notes
Load behavior depends on the dataset type configured in catalog.yml:
File-based datasets (CSV, Parquet, JSON, Excel, Pickle, HDF5): Reads from the configured filepath using the specified load_args.
Database datasets (SQL, SQLQuery, SQLTable): Executes queries or reads tables from the configured database connection.
Versioned datasets: Loads the latest version unless a specific version is requested in the configuration.
Partitioned datasets: Loads and concatenates all partitions.
For SQL datasets with dynamic queries, the query string in catalog.yml should use Python format string syntax with named placeholders:
sales_data:
type: pandas.SQLQueryDataset
sql: SELECT * FROM sales WHERE year={year} AND region='{region}'
credentials: database_creds
Query parameters are substituted at load time:
>>> df = catalog.load("sales_data", year=2024, region="EMEA")
The load operation is idempotent: calling it multiple times with the same parameters returns equivalent data (though not necessarily the same object instance).
Performance Considerations
- Large datasets may take significant time and memory to load
- For large files, consider using chunking or lazy loading
- Cloud storage (S3, GCS, Azure) may incur network latency
- Partitioned datasets are loaded in parallel when possible
Examples
Load a simple dataset:
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("customer_data")
>>> df.shape
(10000, 8)
Load with dynamic SQL query parameters:
>>> # catalog.yml: sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
>>> sales_2024 = catalog.load("sales_data", year=2024, region="EMEA")
>>> sales_2024["year"].unique()
array([2024])
>>> sales_2024["region"].unique()
array(['EMEA'], dtype=object)
Load multiple datasets in sequence:
>>> raw = catalog.load("raw_data")
>>> features = catalog.load("feature_set")
>>> model = catalog.load("trained_model")
Handle missing datasets gracefully:
>>> try:
... data = catalog.load("nonexistent_dataset")
... except KeyError as e:
... print(f"Dataset not found: {e}")
... # Use default data or prompt user
Dataset not found: 'nonexistent_dataset'
Load with different query parameters:
>>> q1_data = catalog.load("sales_data", quarter=1, year=2024)
>>> q2_data = catalog.load("sales_data", quarter=2, year=2024)
676 def save(self, name: str, data: Data) -> None: 677 """ 678 Save a dataset by name to the catalog. 679 680 Store a dataset using its registered name as defined in the catalog 681 configuration files. The method handles all I/O operations, file format 682 serialization, and storage operations based on the dataset's configuration. 683 684 Parameters 685 ---------- 686 name : str 687 The registered name of the dataset to save. This name must match 688 a dataset definition in the catalog configuration files (catalog.yml). 689 The dataset configuration determines the output location, format, 690 and serialization parameters. 691 data : Data 692 The dataset to save. Must be a Data protocol-compatible object 693 (e.g., pandas.DataFrame, Spark DataFrame) that is compatible with 694 the dataset type specified in the catalog configuration. 695 696 Returns 697 ------- 698 None 699 700 Raises 701 ------ 702 KeyError 703 If no dataset with the given name is registered in the catalog. 704 TypeError 705 If the data type is incompatible with the dataset configuration 706 (e.g., attempting to save a DataFrame to a PickleDataset expecting 707 a different object type). 708 ValueError 709 If the dataset cannot be saved due to validation errors, format 710 incompatibilities, or invalid configuration. 711 PermissionError 712 If the target location is not writable due to filesystem permissions. 713 OSError 714 If disk space is insufficient or other I/O errors occur during save. 715 716 See Also 717 -------- 718 load : Load a dataset from the catalog. 719 720 Notes 721 ----- 722 Save behavior depends on the dataset type configured in catalog.yml: 723 724 - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5): 725 Writes to the configured filepath using the specified save_args. 726 Parent directories are created automatically if they don't exist. 727 728 - **Database datasets** (SQL, SQLTable): Writes to the configured 729 database table using the specified connection and save parameters. 730 731 - **Versioned datasets**: Creates a new timestamped version rather 732 than overwriting. Version format: filepath/YYYY-MM-DDTHH.mm.ss.sssZ/ 733 734 - **Partitioned datasets**: Splits data across multiple files based 735 on partitioning configuration. 736 737 The save operation typically overwrites existing data at the target 738 location unless versioning is enabled. For versioned datasets, each 739 save creates a new version without removing previous versions. 740 741 Format-Specific Behavior 742 ------------------------ 743 Different formats have different save characteristics: 744 745 - **CSV**: Human-readable, widely compatible, larger file size 746 - **Parquet**: Columnar format, compressed, efficient for analytics 747 - **Pickle**: Python-specific, preserves exact objects, version-sensitive 748 - **JSON**: Human-readable, good for nested structures 749 - **HDF5**: Binary format, good for large numerical arrays 750 751 Atomicity and Error Handling 752 ----------------------------- 753 The atomicity of save operations depends on the underlying dataset type 754 and storage backend: 755 756 - Local filesystem writes may be atomic for small files 757 - Cloud storage (S3, GCS, Azure) uses multi-part uploads 758 - Database writes depend on transaction support 759 - Partitioned saves may be partially successful 760 761 If a save operation fails partway through, partial data may be written. 762 For critical applications, consider implementing save-to-temporary-then-move 763 patterns or using versioned datasets. 764 765 Performance Considerations 766 -------------------------- 767 - Large datasets may take significant time to serialize and write 768 - Cloud storage uploads may have network latency and bandwidth limits 769 - Compression (enabled in save_args) trades CPU time for disk space 770 - Partitioned datasets can write partitions in parallel 771 772 Examples 773 -------- 774 Save a processed dataset: 775 776 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 777 >>> processed_df = process_data(raw_df) 778 >>> catalog.save("processed_data", processed_df) 779 780 Save multiple datasets in a pipeline: 781 782 >>> catalog = KedroDataCatalog.in_directory("config/catalog") 783 >>> raw = catalog.load("raw_data") 784 >>> cleaned = clean_data(raw) 785 >>> catalog.save("cleaned_data", cleaned) 786 >>> features = engineer_features(cleaned) 787 >>> catalog.save("features", features) 788 >>> predictions = model.predict(features) 789 >>> catalog.save("predictions", predictions) 790 791 Save with versioning (configured in catalog.yml): 792 793 >>> # catalog.yml: 794 >>> # versioned_output: 795 >>> # type: pandas.CSVDataset 796 >>> # filepath: data/output.csv 797 >>> # versioned: true 798 >>> catalog.save("versioned_output", result_df) 799 >>> # Saves to: data/output.csv/2024-01-15T10.30.45.123Z/output.csv 800 801 Handle save errors: 802 803 >>> try: 804 ... catalog.save("output_data", large_df) 805 ... except PermissionError as e: 806 ... print(f"Cannot write to output location: {e}") 807 ... except OSError as e: 808 ... print(f"I/O error during save: {e}") 809 810 Save to different formats (configured per dataset): 811 812 >>> # Same data, different formats for different use cases 813 >>> catalog.save("output_csv", df) # Human-readable 814 >>> catalog.save("output_parquet", df) # Efficient storage 815 >>> catalog.save("output_json", df) # API consumption 816 """ 817 self._catalog.save(name, data)
Save a dataset by name to the catalog.
Store a dataset using its registered name as defined in the catalog configuration files. The method handles all I/O operations, file format serialization, and storage operations based on the dataset's configuration.
Parameters
- name (str): The registered name of the dataset to save. This name must match a dataset definition in the catalog configuration files (catalog.yml). The dataset configuration determines the output location, format, and serialization parameters.
- data (Data): The dataset to save. Must be a Data protocol-compatible object (e.g., pandas.DataFrame, Spark DataFrame) that is compatible with the dataset type specified in the catalog configuration.
Returns
- None
Raises
- KeyError: If no dataset with the given name is registered in the catalog.
- TypeError: If the data type is incompatible with the dataset configuration (e.g., attempting to save a DataFrame to a PickleDataset expecting a different object type).
- ValueError: If the dataset cannot be saved due to validation errors, format incompatibilities, or invalid configuration.
- PermissionError: If the target location is not writable due to filesystem permissions.
- OSError: If disk space is insufficient or other I/O errors occur during save.
See Also
load: Load a dataset from the catalog.
Notes
Save behavior depends on the dataset type configured in catalog.yml:
File-based datasets (CSV, Parquet, JSON, Excel, Pickle, HDF5): Writes to the configured filepath using the specified save_args. Parent directories are created automatically if they don't exist.
Database datasets (SQL, SQLTable): Writes to the configured database table using the specified connection and save parameters.
Versioned datasets: Creates a new timestamped version rather than overwriting. Version format: filepath/YYYY-MM-DDTHH.mm.ss.sssZ/
Partitioned datasets: Splits data across multiple files based on partitioning configuration.
The save operation typically overwrites existing data at the target location unless versioning is enabled. For versioned datasets, each save creates a new version without removing previous versions.
Format-Specific Behavior
Different formats have different save characteristics:
- CSV: Human-readable, widely compatible, larger file size
- Parquet: Columnar format, compressed, efficient for analytics
- Pickle: Python-specific, preserves exact objects, version-sensitive
- JSON: Human-readable, good for nested structures
- HDF5: Binary format, good for large numerical arrays
Atomicity and Error Handling
The atomicity of save operations depends on the underlying dataset type and storage backend:
- Local filesystem writes may be atomic for small files
- Cloud storage (S3, GCS, Azure) uses multi-part uploads
- Database writes depend on transaction support
- Partitioned saves may be partially successful
If a save operation fails partway through, partial data may be written. For critical applications, consider implementing save-to-temporary-then-move patterns or using versioned datasets.
Performance Considerations
- Large datasets may take significant time to serialize and write
- Cloud storage uploads may have network latency and bandwidth limits
- Compression (enabled in save_args) trades CPU time for disk space
- Partitioned datasets can write partitions in parallel
Examples
Save a processed dataset:
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> processed_df = process_data(raw_df)
>>> catalog.save("processed_data", processed_df)
Save multiple datasets in a pipeline:
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> raw = catalog.load("raw_data")
>>> cleaned = clean_data(raw)
>>> catalog.save("cleaned_data", cleaned)
>>> features = engineer_features(cleaned)
>>> catalog.save("features", features)
>>> predictions = model.predict(features)
>>> catalog.save("predictions", predictions)
Save with versioning (configured in catalog.yml):
>>> # catalog.yml:
>>> # versioned_output:
>>> # type: pandas.CSVDataset
>>> # filepath: data/output.csv
>>> # versioned: true
>>> catalog.save("versioned_output", result_df)
>>> # Saves to: data/output.csv/2024-01-15T10.30.45.123Z/output.csv
Handle save errors:
>>> try:
... catalog.save("output_data", large_df)
... except PermissionError as e:
... print(f"Cannot write to output location: {e}")
... except OSError as e:
... print(f"I/O error during save: {e}")
Save to different formats (configured per dataset):
>>> # Same data, different formats for different use cases
>>> catalog.save("output_csv", df) # Human-readable
>>> catalog.save("output_parquet", df) # Efficient storage
>>> catalog.save("output_json", df) # API consumption