adc_toolkit.data.catalogs.kedro

Kedro-based data catalog implementation for the adc-toolkit.

This module provides a production-ready data catalog implementation using Kedro's DataCatalog as the underlying I/O engine. It enables configuration-driven dataset management through YAML files, supporting diverse data formats, storage backends, versioning, partitioning, and dynamic SQL queries.

The KedroDataCatalog wraps Kedro's native catalog with a simplified interface that integrates seamlessly with the adc-toolkit's validation and processing pipeline. It provides factory methods for instantiation and utilities for scaffolding new catalog configurations.

Classes

KedroDataCatalog Main Kedro catalog implementation with methods for loading and saving datasets based on YAML configuration files. Provides in_directory() factory method and init_catalog() scaffolding utility.

Submodules

kedro_catalog Core KedroDataCatalog implementation. kedro_configs Configuration loader utilities for Kedro OmegaConfigLoader. scaffold Utilities for creating catalog directory structures with template files. templates Template YAML files for catalog, globals, and credentials configurations.

See Also

adc_toolkit.data.abs.DataCatalog: Protocol definition for catalogs.
adc_toolkit.data.catalog.ValidatedDataCatalog: Catalog with automatic validation.
kedro.io.DataCatalog: Underlying Kedro catalog implementation.
kedro.config.OmegaConfigLoader: Configuration loader for YAML files.

Notes

The Kedro data catalog system uses YAML configuration files to define datasets, including file paths, formats, load/save parameters, and storage backends. This declarative approach separates data I/O concerns from business logic and enables environment-specific configurations without code changes.

Expected Directory Structure

The catalog expects a specific configuration directory structure::

config_path/
    base/
        catalog.yml      # Base dataset definitions shared across environments
        globals.yml      # Global variables (e.g., base_path, bucket_name)
    local/
        catalog.yml      # Local overrides for development (gitignored)
        credentials.yml  # Credentials for databases/cloud (gitignored)

The base/ directory contains shared definitions, while local/ contains environment-specific overrides. Local files should be added to .gitignore to prevent committing credentials or environment-specific paths.

Configuration Format

Dataset definitions in catalog.yml follow Kedro's format:

# Simple CSV dataset
customer_data:
  type: pandas.CSVDataset
  filepath: data/raw/customers.csv
  load_args:
    sep: ","
    parse_dates: ["signup_date"]
  save_args:
    index: False

# Parquet dataset with versioning
processed_features:
  type: pandas.ParquetDataset
  filepath: data/processed/features.parquet
  versioned: true

# SQL dataset with dynamic query parameters
sales_query:
  type: pandas.SQLQueryDataset
  sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
  credentials: database_creds

# Cloud storage dataset
s3_data:
  type: pandas.ParquetDataset
  filepath: s3://my-bucket/data/dataset.parquet
  credentials: aws_credentials

Global variables in globals.yml can be referenced in catalog.yml using ${globals:variable_name} syntax for parameterization.

Supported Features
  • File Formats: CSV, Parquet, JSON, Excel, Pickle, HDF5, Feather, ORC
  • Storage Backends: Local filesystem, S3, GCS, Azure Blob, HDFS
  • Versioning: Automatic timestamping of saved datasets
  • Partitioning: Split large datasets across multiple files
  • Dynamic Queries: SQL query parameterization at load time
  • Dataset Factories: Pattern-based dataset definitions for systematic naming
  • Credentials Management: Secure credential storage in local/credentials.yml
Thread Safety

KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe for concurrent writes to the same dataset. Concurrent reads are safe. Use external locking mechanisms if concurrent writes are required.

References
Examples

Create a catalog from an existing configuration directory:

>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("customer_data")
>>> df.columns
Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')

Save processed data:

>>> processed_df = process_customers(df)
>>> catalog.save("processed_customers", processed_df)

Initialize a new catalog structure with template files:

>>> result = KedroDataCatalog.init_catalog(
...     "./my_project/config/catalog",
...     include_globals=True,
...     include_catalog=True,
...     include_credentials=True,
... )
>>> print(f"Created: {[f.name for f in result.created_files]}")
Created: ['catalog.yml', 'globals.yml', 'credentials.yml']

Load data with dynamic SQL query parameters:

>>> # catalog.yml defines: sql: "SELECT * FROM sales WHERE year={year}"
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> sales_2024 = catalog.load("sales_data", year=2024)
>>> sales_2023 = catalog.load("sales_data", year=2023)

Use in a complete data pipeline:

>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
>>>
>>> def run_pipeline():
...     catalog = KedroDataCatalog.in_directory("config/production")
...
...     # Extract
...     raw_sales = catalog.load("raw_sales")
...     raw_customers = catalog.load("raw_customers")
...
...     # Transform
...     cleaned_sales = clean_data(raw_sales)
...     enriched = enrich_with_customer_data(cleaned_sales, raw_customers)
...
...     # Load (save results)
...     catalog.save("cleaned_sales", cleaned_sales)
...     catalog.save("enriched_sales", enriched)
>>> run_pipeline()

Create catalog with custom configuration loader:

>>> from kedro.config import OmegaConfigLoader
>>> loader = OmegaConfigLoader(
...     conf_source="config",
...     env="production",
...     base_env="base",
...     default_run_env="local",
... )
>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)

  1"""
  2Kedro-based data catalog implementation for the adc-toolkit.
  3
  4This module provides a production-ready data catalog implementation using Kedro's
  5DataCatalog as the underlying I/O engine. It enables configuration-driven dataset
  6management through YAML files, supporting diverse data formats, storage backends,
  7versioning, partitioning, and dynamic SQL queries.
  8
  9The KedroDataCatalog wraps Kedro's native catalog with a simplified interface
 10that integrates seamlessly with the adc-toolkit's validation and processing
 11pipeline. It provides factory methods for instantiation and utilities for
 12scaffolding new catalog configurations.
 13
 14Classes
 15-------
 16KedroDataCatalog
 17    Main Kedro catalog implementation with methods for loading and saving
 18    datasets based on YAML configuration files. Provides `in_directory()`
 19    factory method and `init_catalog()` scaffolding utility.
 20
 21Submodules
 22----------
 23kedro_catalog
 24    Core KedroDataCatalog implementation.
 25kedro_configs
 26    Configuration loader utilities for Kedro OmegaConfigLoader.
 27scaffold
 28    Utilities for creating catalog directory structures with template files.
 29templates
 30    Template YAML files for catalog, globals, and credentials configurations.
 31
 32See Also
 33--------
 34adc_toolkit.data.abs.DataCatalog : Protocol definition for catalogs.
 35adc_toolkit.data.catalog.ValidatedDataCatalog : Catalog with automatic validation.
 36kedro.io.DataCatalog : Underlying Kedro catalog implementation.
 37kedro.config.OmegaConfigLoader : Configuration loader for YAML files.
 38
 39Notes
 40-----
 41The Kedro data catalog system uses YAML configuration files to define datasets,
 42including file paths, formats, load/save parameters, and storage backends. This
 43declarative approach separates data I/O concerns from business logic and enables
 44environment-specific configurations without code changes.
 45
 46Expected Directory Structure
 47----------------------------
 48The catalog expects a specific configuration directory structure::
 49
 50    config_path/
 51        base/
 52            catalog.yml      # Base dataset definitions shared across environments
 53            globals.yml      # Global variables (e.g., base_path, bucket_name)
 54        local/
 55            catalog.yml      # Local overrides for development (gitignored)
 56            credentials.yml  # Credentials for databases/cloud (gitignored)
 57
 58The `base/` directory contains shared definitions, while `local/` contains
 59environment-specific overrides. Local files should be added to .gitignore to
 60prevent committing credentials or environment-specific paths.
 61
 62Configuration Format
 63--------------------
 64Dataset definitions in catalog.yml follow Kedro's format:
 65
 66.. code-block:: yaml
 67
 68    # Simple CSV dataset
 69    customer_data:
 70      type: pandas.CSVDataset
 71      filepath: data/raw/customers.csv
 72      load_args:
 73        sep: ","
 74        parse_dates: ["signup_date"]
 75      save_args:
 76        index: False
 77
 78    # Parquet dataset with versioning
 79    processed_features:
 80      type: pandas.ParquetDataset
 81      filepath: data/processed/features.parquet
 82      versioned: true
 83
 84    # SQL dataset with dynamic query parameters
 85    sales_query:
 86      type: pandas.SQLQueryDataset
 87      sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
 88      credentials: database_creds
 89
 90    # Cloud storage dataset
 91    s3_data:
 92      type: pandas.ParquetDataset
 93      filepath: s3://my-bucket/data/dataset.parquet
 94      credentials: aws_credentials
 95
 96Global variables in globals.yml can be referenced in catalog.yml using
 97`${globals:variable_name}` syntax for parameterization.
 98
 99Supported Features
100------------------
101- **File Formats**: CSV, Parquet, JSON, Excel, Pickle, HDF5, Feather, ORC
102- **Storage Backends**: Local filesystem, S3, GCS, Azure Blob, HDFS
103- **Versioning**: Automatic timestamping of saved datasets
104- **Partitioning**: Split large datasets across multiple files
105- **Dynamic Queries**: SQL query parameterization at load time
106- **Dataset Factories**: Pattern-based dataset definitions for systematic naming
107- **Credentials Management**: Secure credential storage in local/credentials.yml
108
109Thread Safety
110-------------
111KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe for
112concurrent writes to the same dataset. Concurrent reads are safe. Use external
113locking mechanisms if concurrent writes are required.
114
115References
116----------
117.. [1] Kedro Documentation: Data Catalog
118   https://docs.kedro.org/en/stable/data/data_catalog.html
119.. [2] Kedro Documentation: Configuration
120   https://docs.kedro.org/en/stable/configuration/configuration_basics.html
121
122Examples
123--------
124Create a catalog from an existing configuration directory:
125
126>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
127>>> catalog = KedroDataCatalog.in_directory("config/catalog")
128>>> df = catalog.load("customer_data")
129>>> df.columns
130Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
131
132Save processed data:
133
134>>> processed_df = process_customers(df)
135>>> catalog.save("processed_customers", processed_df)
136
137Initialize a new catalog structure with template files:
138
139>>> result = KedroDataCatalog.init_catalog(
140...     "./my_project/config/catalog",
141...     include_globals=True,
142...     include_catalog=True,
143...     include_credentials=True,
144... )
145>>> print(f"Created: {[f.name for f in result.created_files]}")
146Created: ['catalog.yml', 'globals.yml', 'credentials.yml']
147
148Load data with dynamic SQL query parameters:
149
150>>> # catalog.yml defines: sql: "SELECT * FROM sales WHERE year={year}"
151>>> catalog = KedroDataCatalog.in_directory("config/catalog")
152>>> sales_2024 = catalog.load("sales_data", year=2024)
153>>> sales_2023 = catalog.load("sales_data", year=2023)
154
155Use in a complete data pipeline:
156
157>>> from adc_toolkit.data.catalogs.kedro import KedroDataCatalog
158>>>
159>>> def run_pipeline():
160...     catalog = KedroDataCatalog.in_directory("config/production")
161...
162...     # Extract
163...     raw_sales = catalog.load("raw_sales")
164...     raw_customers = catalog.load("raw_customers")
165...
166...     # Transform
167...     cleaned_sales = clean_data(raw_sales)
168...     enriched = enrich_with_customer_data(cleaned_sales, raw_customers)
169...
170...     # Load (save results)
171...     catalog.save("cleaned_sales", cleaned_sales)
172...     catalog.save("enriched_sales", enriched)
173>>> run_pipeline()
174
175Create catalog with custom configuration loader:
176
177>>> from kedro.config import OmegaConfigLoader
178>>> loader = OmegaConfigLoader(
179...     conf_source="config",
180...     env="production",
181...     base_env="base",
182...     default_run_env="local",
183... )
184>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
185"""
186
187from .kedro_catalog import KedroDataCatalog
188
189
190__all__ = ["KedroDataCatalog"]
class KedroDataCatalog:
 78class KedroDataCatalog:
 79    """
 80    Kedro-based implementation of the DataCatalog protocol.
 81
 82    This class provides a production-ready data catalog using Kedro's DataCatalog
 83    as the underlying I/O engine. It supports configuration-driven dataset management
 84    through YAML files, enabling declarative definitions of data sources, file formats,
 85    load/save parameters, and storage backends.
 86
 87    The catalog handles diverse data formats (CSV, Parquet, JSON, Excel, Pickle, HDF5),
 88    storage locations (local filesystem, S3, GCS, Azure Blob), and advanced features
 89    like versioning, partitioning, and dynamic SQL queries.
 90
 91    Parameters
 92    ----------
 93    config_path : str or pathlib.Path
 94        Path to the configuration directory containing catalog YAML files.
 95        The directory should contain base/ and local/ subdirectories with
 96        catalog.yml, globals.yml, and optionally credentials.yml files.
 97    config_loader : kedro.config.AbstractConfigLoader or None, default=None
 98        Kedro configuration loader instance. If None, an OmegaConfigLoader
 99        will be created automatically to load YAML configurations from the
100        config_path directory.
101
102    Attributes
103    ----------
104    config_path : str
105        Path to the configuration directory as a string.
106    config_loader : kedro.config.AbstractConfigLoader
107        The configuration loader instance used to read YAML files.
108    _catalog : kedro.io.DataCatalog
109        Internal Kedro DataCatalog instance handling actual I/O operations.
110
111    Methods
112    -------
113    load(name, **query_args)
114        Load a dataset by name from the catalog.
115    save(name, data)
116        Save a dataset by name to the catalog.
117    in_directory(path)
118        Factory method to create a catalog from a configuration directory.
119    init_catalog(path, overwrite=False, include_globals=True, ...)
120        Create the Kedro catalog folder structure with template files.
121
122    Raises
123    ------
124    FileNotFoundError
125        If the configuration directory does not exist or the catalog structure
126        is incomplete (missing required base/ or local/ directories).
127
128    See Also
129    --------
130    adc_toolkit.data.abs.DataCatalog : Protocol definition for data catalogs.
131    adc_toolkit.data.validated_catalog.ValidatedDataCatalog : Catalog with validation.
132    kedro.io.DataCatalog : Underlying Kedro catalog implementation.
133    kedro.config.OmegaConfigLoader : Default configuration loader.
134
135    Notes
136    -----
137    The catalog expects a specific directory structure:
138
139    config_path/
140        base/
141            catalog.yml      # Base dataset definitions
142            globals.yml      # Global variables and parameters
143        local/
144            catalog.yml      # Local overrides (gitignored)
145            credentials.yml  # Credentials (gitignored)
146
147    Dataset definitions in catalog.yml follow Kedro's format:
148
149    .. code-block:: yaml
150
151        dataset_name:
152          type: pandas.CSVDataset
153          filepath: data/raw/dataset.csv
154          load_args:
155            sep: ","
156          save_args:
157            index: False
158
159    The catalog supports versioning, which automatically timestamps saved datasets
160    and allows loading specific versions. Partitioning enables splitting large
161    datasets into multiple files for parallel processing.
162
163    For SQL datasets, the catalog supports dynamic query parameters that can be
164    provided at load time using the query_args keyword arguments.
165
166    Thread Safety
167    -------------
168    The KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe
169    for concurrent writes to the same dataset. Concurrent reads are safe.
170
171    References
172    ----------
173    .. [1] Kedro Documentation: Data Catalog
174       https://docs.kedro.org/en/stable/data/data_catalog.html
175    .. [2] Kedro Documentation: Configuration
176       https://docs.kedro.org/en/stable/configuration/configuration_basics.html
177
178    Examples
179    --------
180    Create a catalog using the factory method:
181
182    >>> catalog = KedroDataCatalog.in_directory("config/catalog")
183    >>> df = catalog.load("customer_data")
184    >>> df.columns
185    Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')
186
187    Save processed data:
188
189    >>> processed_df = process_customers(df)
190    >>> catalog.save("processed_customers", processed_df)
191
192    Load data with dynamic SQL query parameters:
193
194    >>> # catalog.yml defines: SELECT * FROM sales WHERE year={year} AND region='{region}'
195    >>> sales = catalog.load("sales_data", year=2024, region="EMEA")
196    >>> sales.shape
197    (15420, 8)
198
199    Create a catalog with custom config loader:
200
201    >>> from kedro.config import OmegaConfigLoader
202    >>> loader = OmegaConfigLoader(conf_source="config", env="production", base_env="base", default_run_env="local")
203    >>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
204
205    Initialize a new catalog structure:
206
207    >>> result = KedroDataCatalog.init_catalog(
208    ...     "./my_project/config", include_globals=True, include_catalog=True, include_credentials=True
209    ... )
210    >>> print(f"Created: {[f.name for f in result.created_files]}")
211    Created: ['catalog.yml', 'globals.yml', 'credentials.yml']
212    """
213
214    def __init__(
215        self,
216        config_path: str | Path,
217        config_loader: AbstractConfigLoader | None = None,
218    ) -> None:
219        """
220        Initialize a Kedro data catalog from configuration files.
221
222        This constructor creates a new catalog instance by reading dataset
223        definitions from YAML configuration files in the specified directory.
224        It validates that the required directory structure exists and creates
225        the underlying Kedro DataCatalog.
226
227        Parameters
228        ----------
229        config_path : str or pathlib.Path
230            Path to the configuration directory containing catalog definitions.
231            The directory must contain base/ and local/ subdirectories with
232            the required YAML files (catalog.yml at minimum).
233        config_loader : kedro.config.AbstractConfigLoader or None, default=None
234            Kedro configuration loader for reading YAML files. If None, an
235            OmegaConfigLoader will be created automatically with default settings
236            (base_env="base", default_run_env="local").
237
238        Raises
239        ------
240        FileNotFoundError
241            If the configuration directory does not exist. The error message
242            includes instructions for creating the directory structure using
243            the CLI command or the ``init_catalog`` class method.
244        FileNotFoundError
245            If the catalog structure is incomplete (missing base/ or local/
246            directories or required catalog.yml files). The error message
247            includes instructions for creating the complete structure.
248
249        See Also
250        --------
251        in_directory : Factory method for creating catalog instances.
252        init_catalog : Class method for scaffolding catalog directory structure.
253
254        Notes
255        -----
256        The constructor performs the following steps:
257        1. Validates that the config_path directory exists
258        2. Checks for required catalog structure (base/ and local/ directories)
259        3. Creates or uses the provided config_loader
260        4. Loads catalog configuration and creates the Kedro DataCatalog
261
262        The catalog structure validation requires:
263        - base/ directory with catalog.yml
264        - local/ directory (can be empty initially)
265
266        Credentials are optional and should be placed in local/credentials.yml
267        to prevent accidental commits to version control.
268
269        Examples
270        --------
271        Create a catalog with default configuration loader:
272
273        >>> catalog = KedroDataCatalog("config/catalog")
274        >>> catalog.config_path
275        'config/catalog'
276
277        Create a catalog with custom configuration loader:
278
279        >>> from kedro.config import OmegaConfigLoader
280        >>> loader = OmegaConfigLoader(conf_source="config", env="staging", base_env="base", default_run_env="local")
281        >>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
282
283        Handle missing configuration directory:
284
285        >>> try:
286        ...     catalog = KedroDataCatalog("nonexistent/path")
287        ... except FileNotFoundError as e:
288        ...     print("Directory not found. Run init_catalog to create it.")
289        Directory not found. Run init_catalog to create it.
290        """
291        self.config_path = str(config_path)
292        path = Path(config_path)
293
294        if not path.exists():
295            raise FileNotFoundError(
296                f"Configuration directory not found: {config_path}\n"
297                f"To create the catalog folder structure, run:\n"
298                f"  adc-toolkit init-catalog {config_path}\n"
299                f"Or use the class method:\n"
300                f"  KedroDataCatalog.init_catalog('{config_path}')"
301            )
302
303        if not catalog_structure_exists(config_path, require_credentials=False):
304            raise FileNotFoundError(
305                f"Catalog structure is incomplete at: {path}\n"
306                f"Missing required files in base/ or local/.\n"
307                f"To create the catalog folder structure, run:\n"
308                f"  adc-toolkit init-catalog {config_path}\n"
309                f"Or use the class method:\n"
310                f"  KedroDataCatalog.init_catalog('{config_path}')"
311            )
312
313        self.config_loader = config_loader
314        if not self.config_loader:
315            self.config_loader = create_omega_config_loader(self.config_path)
316        self._catalog = create_catalog(self.config_loader)
317
318    @classmethod
319    def in_directory(cls, path: str | Path) -> "KedroDataCatalog":
320        """
321        Create a catalog instance from a configuration directory.
322
323        This factory method provides a convenient way to instantiate a
324        KedroDataCatalog by specifying only the configuration directory path.
325        It is the recommended way to create catalog instances in application code.
326
327        The method creates a catalog with default settings, using an automatically
328        configured OmegaConfigLoader to read YAML files from the directory.
329
330        Parameters
331        ----------
332        path : str or pathlib.Path
333            Path to the configuration directory containing catalog definitions.
334            The directory must have the required Kedro catalog structure with
335            base/ and local/ subdirectories.
336
337        Returns
338        -------
339        KedroDataCatalog
340            A new catalog instance configured with datasets from the directory.
341            The catalog is immediately ready to load and save data.
342
343        Raises
344        ------
345        FileNotFoundError
346            If the specified directory does not exist or lacks the required
347            catalog structure (base/ and local/ directories with catalog.yml).
348
349        See Also
350        --------
351        __init__ : Constructor with additional configuration options.
352        init_catalog : Create the catalog directory structure.
353
354        Notes
355        -----
356        This factory method is equivalent to calling the constructor with just
357        the path parameter:
358
359        >>> catalog = KedroDataCatalog(path)
360
361        However, using ``in_directory`` is preferred because:
362        - It matches the DataCatalog protocol interface
363        - It provides better semantic clarity
364        - It enables polymorphism when using multiple catalog implementations
365
366        The method uses default configuration settings:
367        - Base environment: "base"
368        - Default run environment: "local"
369        - Configuration format: YAML
370
371        For advanced configuration needs (custom environments, config merging
372        strategies, runtime parameters), use the constructor directly with a
373        custom AbstractConfigLoader.
374
375        Examples
376        --------
377        Basic usage:
378
379        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
380        >>> df = catalog.load("training_data")
381
382        Using pathlib.Path:
383
384        >>> from pathlib import Path
385        >>> config_dir = Path("config") / "catalog"
386        >>> catalog = KedroDataCatalog.in_directory(config_dir)
387
388        Load and save in a pipeline:
389
390        >>> catalog = KedroDataCatalog.in_directory("./config")
391        >>> raw = catalog.load("raw_data")
392        >>> processed = transform(raw)
393        >>> catalog.save("processed_data", processed)
394
395        Polymorphic usage with DataCatalog protocol:
396
397        >>> def run_pipeline(catalog: DataCatalog) -> None:
398        ...     data = catalog.load("input")
399        ...     result = process(data)
400        ...     catalog.save("output", result)
401        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
402        >>> run_pipeline(catalog)
403        """
404        return cls(path)
405
406    @classmethod
407    def init_catalog(
408        cls,
409        path: str | Path,
410        *,
411        overwrite: bool = False,
412        include_globals: bool = True,
413        include_catalog: bool = True,
414        include_credentials: bool = True,
415    ) -> ScaffoldResult:
416        """
417        Create Kedro catalog directory structure with template configuration files.
418
419        This class method scaffolds a complete Kedro catalog configuration directory
420        with the required folder structure and template YAML files. It is intended
421        for initializing new projects or adding catalog functionality to existing
422        projects.
423
424        The method creates a directory structure following Kedro conventions:
425
426        - path/base/catalog.yml: Base dataset definitions
427        - path/base/globals.yml: Global variables and parameters
428        - path/local/catalog.yml: Local environment overrides
429        - path/local/credentials.yml: Credentials (should be gitignored)
430
431        Template files include helpful comments and examples to guide configuration.
432
433        Parameters
434        ----------
435        path : str or pathlib.Path
436            Root path for the configuration directory. The directory will be
437            created if it doesn't exist. Subdirectories base/ and local/ will
438            be created within this path.
439        overwrite : bool, default=False
440            If True, overwrite existing files at the destination paths. If False,
441            existing files are preserved and reported in skipped_files. Use with
442            caution to avoid losing custom configurations.
443        include_globals : bool, default=True
444            If True, create base/globals.yml with template global variables.
445            Global variables can be referenced in catalog.yml using ${variable}
446            syntax for parameterization.
447        include_catalog : bool, default=True
448            If True, create base/catalog.yml and local/catalog.yml with template
449            dataset definitions. These files are essential for catalog operation.
450        include_credentials : bool, default=True
451            If True, create local/credentials.yml for storing credentials. This
452            file should be added to .gitignore to prevent committing secrets.
453
454        Returns
455        -------
456        ScaffoldResult
457            A result object containing:
458            - created_files: List of Path objects for files that were created
459            - skipped_files: List of Path objects for files that already existed
460            - created_directories: List of Path objects for directories created
461
462        See Also
463        --------
464        in_directory : Factory method to create catalog from existing config.
465        adc_toolkit.data.catalogs.kedro.scaffold.create_catalog_folder_structure :
466            Underlying scaffolding function.
467
468        Notes
469        -----
470        The scaffolded directory structure follows Kedro best practices:
471
472        - **base/**: Contains base configurations shared across environments
473        - **local/**: Contains local overrides and credentials (gitignored)
474
475        Template catalog.yml includes examples for common dataset types:
476        - CSV files with pandas.CSVDataset
477        - Parquet files with pandas.ParquetDataset
478        - Excel files with pandas.ExcelDataset
479        - Pickle files with pickle.PickleDataset
480
481        Template globals.yml includes examples for:
482        - Base paths (data directories)
483        - Common parameters (date formats, separators)
484        - Environment-specific settings
485
486        After running this method, you should:
487        1. Review and customize the generated YAML files
488        2. Add local/credentials.yml to .gitignore
489        3. Define your project-specific datasets in base/catalog.yml
490        4. Add environment-specific overrides in local/catalog.yml
491
492        This is equivalent to running the CLI command:
493        ``adc-toolkit init-catalog <path>``
494
495        Examples
496        --------
497        Initialize a catalog in a new project:
498
499        >>> result = KedroDataCatalog.init_catalog("./config/catalog")
500        >>> print(f"Created {len(result.created_files)} files")
501        Created 4 files
502        >>> print(f"Directories: {[d.name for d in result.created_directories]}")
503        Directories: ['base', 'local']
504
505        Initialize with selective templates:
506
507        >>> result = KedroDataCatalog.init_catalog(
508        ...     "./config/catalog",
509        ...     include_globals=True,
510        ...     include_catalog=True,
511        ...     include_credentials=False,  # No credentials needed
512        ... )
513
514        Reinitialize with overwrite (use carefully):
515
516        >>> result = KedroDataCatalog.init_catalog(
517        ...     "./config/catalog",
518        ...     overwrite=True,  # Overwrites existing files
519        ... )
520
521        Check what was created vs. skipped:
522
523        >>> result = KedroDataCatalog.init_catalog("./config/catalog")
524        >>> if result.skipped_files:
525        ...     print(f"Skipped existing: {[f.name for f in result.skipped_files]}")
526        ... if result.created_files:
527        ...     print(f"Created new: {[f.name for f in result.created_files]}")
528
529        Use in project setup script:
530
531        >>> from pathlib import Path
532        >>> project_root = Path("./my_project")
533        >>> catalog_dir = project_root / "config" / "catalog"
534        >>> result = KedroDataCatalog.init_catalog(catalog_dir)
535        >>> assert catalog_dir.exists()
536        >>> assert (catalog_dir / "base" / "catalog.yml").exists()
537        """
538        return create_catalog_folder_structure(
539            path,
540            overwrite=overwrite,
541            include_globals=include_globals,
542            include_catalog=include_catalog,
543            include_credentials=include_credentials,
544        )
545
546    def load(self, name: str, **query_args: Any) -> Data:
547        """
548        Load a dataset by name from the catalog.
549
550        Retrieve a dataset using its registered name as defined in the catalog
551        configuration files. The method handles all I/O operations, file format
552        parsing, and type conversions based on the dataset's configuration.
553
554        For SQL-based datasets with parameterized queries, this method supports
555        dynamic query parameter substitution through keyword arguments, enabling
556        flexible data filtering and selection at load time.
557
558        Parameters
559        ----------
560        name : str
561            The registered name of the dataset to load. This name must match
562            a dataset definition in the catalog configuration files (catalog.yml).
563        **query_args : Any
564            Keyword arguments for dynamic SQL query parameterization. Only
565            applicable to SQL-based datasets with parameterized queries using
566            Python format string syntax (e.g., WHERE year={year}). For non-SQL
567            datasets, query_args are ignored.
568
569        Returns
570        -------
571        Data
572            The loaded dataset as a Data protocol-compatible object. The specific
573            type depends on the dataset configuration (typically pandas.DataFrame,
574            but can be Spark DataFrame, numpy array, or other types defined in
575            the catalog).
576
577        Raises
578        ------
579        KeyError
580            If no dataset with the given name is registered in the catalog.
581        FileNotFoundError
582            If the dataset's source file or database does not exist.
583        ValueError
584            If the dataset cannot be loaded due to format errors, parsing errors,
585            or invalid query_args for datasets that don't support queries.
586        PermissionError
587            If the dataset's source file or database is not readable.
588
589        See Also
590        --------
591        save : Save a dataset to the catalog.
592        _load_with_dynamic_query : Internal method for parameterized SQL queries.
593
594        Notes
595        -----
596        Load behavior depends on the dataset type configured in catalog.yml:
597
598        - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5):
599          Reads from the configured filepath using the specified load_args.
600
601        - **Database datasets** (SQL, SQLQuery, SQLTable): Executes queries
602          or reads tables from the configured database connection.
603
604        - **Versioned datasets**: Loads the latest version unless a specific
605          version is requested in the configuration.
606
607        - **Partitioned datasets**: Loads and concatenates all partitions.
608
609        For SQL datasets with dynamic queries, the query string in catalog.yml
610        should use Python format string syntax with named placeholders:
611
612        .. code-block:: yaml
613
614            sales_data:
615              type: pandas.SQLQueryDataset
616              sql: SELECT * FROM sales WHERE year={year} AND region='{region}'
617              credentials: database_creds
618
619        Query parameters are substituted at load time:
620
621        >>> df = catalog.load("sales_data", year=2024, region="EMEA")
622
623        The load operation is idempotent: calling it multiple times with the
624        same parameters returns equivalent data (though not necessarily the
625        same object instance).
626
627        Performance Considerations
628        --------------------------
629        - Large datasets may take significant time and memory to load
630        - For large files, consider using chunking or lazy loading
631        - Cloud storage (S3, GCS, Azure) may incur network latency
632        - Partitioned datasets are loaded in parallel when possible
633
634        Examples
635        --------
636        Load a simple dataset:
637
638        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
639        >>> df = catalog.load("customer_data")
640        >>> df.shape
641        (10000, 8)
642
643        Load with dynamic SQL query parameters:
644
645        >>> # catalog.yml: sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
646        >>> sales_2024 = catalog.load("sales_data", year=2024, region="EMEA")
647        >>> sales_2024["year"].unique()
648        array([2024])
649        >>> sales_2024["region"].unique()
650        array(['EMEA'], dtype=object)
651
652        Load multiple datasets in sequence:
653
654        >>> raw = catalog.load("raw_data")
655        >>> features = catalog.load("feature_set")
656        >>> model = catalog.load("trained_model")
657
658        Handle missing datasets gracefully:
659
660        >>> try:
661        ...     data = catalog.load("nonexistent_dataset")
662        ... except KeyError as e:
663        ...     print(f"Dataset not found: {e}")
664        ...     # Use default data or prompt user
665        Dataset not found: 'nonexistent_dataset'
666
667        Load with different query parameters:
668
669        >>> q1_data = catalog.load("sales_data", quarter=1, year=2024)
670        >>> q2_data = catalog.load("sales_data", quarter=2, year=2024)
671        """
672        if query_args:
673            return self._load_with_dynamic_query(name, **query_args)
674        return self._catalog.load(name)
675
676    def save(self, name: str, data: Data) -> None:
677        """
678        Save a dataset by name to the catalog.
679
680        Store a dataset using its registered name as defined in the catalog
681        configuration files. The method handles all I/O operations, file format
682        serialization, and storage operations based on the dataset's configuration.
683
684        Parameters
685        ----------
686        name : str
687            The registered name of the dataset to save. This name must match
688            a dataset definition in the catalog configuration files (catalog.yml).
689            The dataset configuration determines the output location, format,
690            and serialization parameters.
691        data : Data
692            The dataset to save. Must be a Data protocol-compatible object
693            (e.g., pandas.DataFrame, Spark DataFrame) that is compatible with
694            the dataset type specified in the catalog configuration.
695
696        Returns
697        -------
698        None
699
700        Raises
701        ------
702        KeyError
703            If no dataset with the given name is registered in the catalog.
704        TypeError
705            If the data type is incompatible with the dataset configuration
706            (e.g., attempting to save a DataFrame to a PickleDataset expecting
707            a different object type).
708        ValueError
709            If the dataset cannot be saved due to validation errors, format
710            incompatibilities, or invalid configuration.
711        PermissionError
712            If the target location is not writable due to filesystem permissions.
713        OSError
714            If disk space is insufficient or other I/O errors occur during save.
715
716        See Also
717        --------
718        load : Load a dataset from the catalog.
719
720        Notes
721        -----
722        Save behavior depends on the dataset type configured in catalog.yml:
723
724        - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5):
725          Writes to the configured filepath using the specified save_args.
726          Parent directories are created automatically if they don't exist.
727
728        - **Database datasets** (SQL, SQLTable): Writes to the configured
729          database table using the specified connection and save parameters.
730
731        - **Versioned datasets**: Creates a new timestamped version rather
732          than overwriting. Version format: filepath/YYYY-MM-DDTHH.mm.ss.sssZ/
733
734        - **Partitioned datasets**: Splits data across multiple files based
735          on partitioning configuration.
736
737        The save operation typically overwrites existing data at the target
738        location unless versioning is enabled. For versioned datasets, each
739        save creates a new version without removing previous versions.
740
741        Format-Specific Behavior
742        ------------------------
743        Different formats have different save characteristics:
744
745        - **CSV**: Human-readable, widely compatible, larger file size
746        - **Parquet**: Columnar format, compressed, efficient for analytics
747        - **Pickle**: Python-specific, preserves exact objects, version-sensitive
748        - **JSON**: Human-readable, good for nested structures
749        - **HDF5**: Binary format, good for large numerical arrays
750
751        Atomicity and Error Handling
752        -----------------------------
753        The atomicity of save operations depends on the underlying dataset type
754        and storage backend:
755
756        - Local filesystem writes may be atomic for small files
757        - Cloud storage (S3, GCS, Azure) uses multi-part uploads
758        - Database writes depend on transaction support
759        - Partitioned saves may be partially successful
760
761        If a save operation fails partway through, partial data may be written.
762        For critical applications, consider implementing save-to-temporary-then-move
763        patterns or using versioned datasets.
764
765        Performance Considerations
766        --------------------------
767        - Large datasets may take significant time to serialize and write
768        - Cloud storage uploads may have network latency and bandwidth limits
769        - Compression (enabled in save_args) trades CPU time for disk space
770        - Partitioned datasets can write partitions in parallel
771
772        Examples
773        --------
774        Save a processed dataset:
775
776        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
777        >>> processed_df = process_data(raw_df)
778        >>> catalog.save("processed_data", processed_df)
779
780        Save multiple datasets in a pipeline:
781
782        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
783        >>> raw = catalog.load("raw_data")
784        >>> cleaned = clean_data(raw)
785        >>> catalog.save("cleaned_data", cleaned)
786        >>> features = engineer_features(cleaned)
787        >>> catalog.save("features", features)
788        >>> predictions = model.predict(features)
789        >>> catalog.save("predictions", predictions)
790
791        Save with versioning (configured in catalog.yml):
792
793        >>> # catalog.yml:
794        >>> #   versioned_output:
795        >>> #     type: pandas.CSVDataset
796        >>> #     filepath: data/output.csv
797        >>> #     versioned: true
798        >>> catalog.save("versioned_output", result_df)
799        >>> # Saves to: data/output.csv/2024-01-15T10.30.45.123Z/output.csv
800
801        Handle save errors:
802
803        >>> try:
804        ...     catalog.save("output_data", large_df)
805        ... except PermissionError as e:
806        ...     print(f"Cannot write to output location: {e}")
807        ... except OSError as e:
808        ...     print(f"I/O error during save: {e}")
809
810        Save to different formats (configured per dataset):
811
812        >>> # Same data, different formats for different use cases
813        >>> catalog.save("output_csv", df)  # Human-readable
814        >>> catalog.save("output_parquet", df)  # Efficient storage
815        >>> catalog.save("output_json", df)  # API consumption
816        """
817        self._catalog.save(name, data)
818
819    def _load_with_dynamic_query(self, name: str, **query_args: Any) -> Data:
820        """
821        Load data from the catalog with dynamic SQL query parameterization.
822
823        This internal method implements dynamic query parameter substitution for
824        SQL-based datasets. It temporarily modifies the dataset's query string
825        by substituting format placeholders with provided arguments, executes
826        the load operation, then restores the original query template.
827
828        Parameters
829        ----------
830        name : str
831            The registered name of the SQL dataset to load. The dataset must
832            be configured with a parameterized query using Python format string
833            syntax (e.g., SELECT * FROM table WHERE id={id}).
834        **query_args : Any
835            Keyword arguments providing values for query parameters. Keys must
836            match the placeholder names in the query template. Values are
837            substituted using Python's str.format() method.
838
839        Returns
840        -------
841        Data
842            The loaded dataset with the parameterized query applied. The return
843            type depends on the dataset configuration (typically pandas.DataFrame
844            for SQL datasets).
845
846        Raises
847        ------
848        ValueError
849            If the dataset does not support queries (i.e., the dataset
850            configuration does not include a 'query' parameter in load_args),
851            or if query parameter substitution fails due to format string errors.
852        KeyError
853            If query_args are missing required parameters referenced in the
854            query template, or if the dataset name is not registered.
855
856        See Also
857        --------
858        load : Public method that delegates to this internal method when query_args provided.
859
860        Notes
861        -----
862        This method directly manipulates the internal Kedro DataCatalog's dataset
863        configuration by:
864        1. Accessing the dataset's _load_args dictionary
865        2. Extracting the query template string
866        3. Substituting placeholders with provided arguments using str.format()
867        4. Loading data with the substituted query
868        5. Restoring the original query template for subsequent loads
869
870        The query restoration ensures that the catalog remains stateless and
871        multiple calls with different parameters don't interfere with each other.
872
873        Query Parameterization Format
874        ------------------------------
875        The query template in catalog.yml should use Python format string syntax:
876
877        - Named placeholders: {parameter_name}
878        - String parameters need explicit quotes: WHERE name='{name}'
879        - Numeric parameters don't need quotes: WHERE id={id}
880
881        Example catalog.yml configuration:
882
883        .. code-block:: yaml
884
885            parameterized_sales:
886              type: pandas.SQLQueryDataset
887              sql: |
888                SELECT * FROM sales
889                WHERE year={year}
890                  AND region='{region}'
891                  AND revenue > {min_revenue}
892              credentials: db_credentials
893
894        Security Considerations
895        -----------------------
896        This method uses Python's str.format() for parameter substitution, which
897        does NOT provide SQL injection protection. Use this method only with:
898
899        - Trusted parameter values from application code
900        - Validated and sanitized user inputs
901        - Internal pipeline parameters
902
903        For user-provided inputs, consider using database-specific parameter
904        binding mechanisms instead of string formatting.
905
906        Examples
907        --------
908        Internal usage by the load method:
909
910        >>> # User calls load with query parameters
911        >>> df = catalog.load("sales_data", year=2024, region="EMEA")
912        >>> # Internally delegates to _load_with_dynamic_query
913
914        Query template in catalog.yml:
915
916        .. code-block:: yaml
917
918            sales_data:
919              type: pandas.SQLQueryDataset
920              sql: SELECT * FROM sales WHERE year={year} AND region='{region}'
921              credentials: database_creds
922
923        Equivalent direct call (not recommended for users):
924
925        >>> data = catalog._load_with_dynamic_query("sales_data", year=2024, region="EMEA")
926
927        Error when dataset doesn't support queries:
928
929        >>> try:
930        ...     catalog._load_with_dynamic_query("csv_dataset", param=123)
931        ... except ValueError as e:
932        ...     print(e)
933        Data set `csv_dataset` does not support queries.
934        """
935        load_args = self._catalog._datasets[name]._load_args
936        if "query" not in load_args:
937            raise ValueError(f"Data set `{name}` does not support queries.")
938
939        raw_query = load_args["query"]
940        load_args["query"] = raw_query.format(**query_args)
941        data = self._catalog.load(name)
942        load_args["query"] = raw_query
943
944        return data

Kedro-based implementation of the DataCatalog protocol.

This class provides a production-ready data catalog using Kedro's DataCatalog as the underlying I/O engine. It supports configuration-driven dataset management through YAML files, enabling declarative definitions of data sources, file formats, load/save parameters, and storage backends.

The catalog handles diverse data formats (CSV, Parquet, JSON, Excel, Pickle, HDF5), storage locations (local filesystem, S3, GCS, Azure Blob), and advanced features like versioning, partitioning, and dynamic SQL queries.

Parameters
  • config_path (str or pathlib.Path): Path to the configuration directory containing catalog YAML files. The directory should contain base/ and local/ subdirectories with catalog.yml, globals.yml, and optionally credentials.yml files.
  • config_loader (kedro.config.AbstractConfigLoader or None, default=None): Kedro configuration loader instance. If None, an OmegaConfigLoader will be created automatically to load YAML configurations from the config_path directory.
Attributes
  • config_path (str): Path to the configuration directory as a string.
  • config_loader (kedro.config.AbstractConfigLoader): The configuration loader instance used to read YAML files.
  • _catalog (kedro.io.DataCatalog): Internal Kedro DataCatalog instance handling actual I/O operations.
Methods

load(name, **query_args) Load a dataset by name from the catalog. save(name, data) Save a dataset by name to the catalog. in_directory(path) Factory method to create a catalog from a configuration directory. init_catalog(path, overwrite=False, include_globals=True, ...) Create the Kedro catalog folder structure with template files.

Raises
  • FileNotFoundError: If the configuration directory does not exist or the catalog structure is incomplete (missing required base/ or local/ directories).
See Also

adc_toolkit.data.abs.DataCatalog: Protocol definition for data catalogs.
adc_toolkit.data.validated_catalog.ValidatedDataCatalog: Catalog with validation.
kedro.io.DataCatalog: Underlying Kedro catalog implementation.
kedro.config.OmegaConfigLoader: Default configuration loader.

Notes

The catalog expects a specific directory structure:

config_path/ base/ catalog.yml # Base dataset definitions globals.yml # Global variables and parameters local/ catalog.yml # Local overrides (gitignored) credentials.yml # Credentials (gitignored)

Dataset definitions in catalog.yml follow Kedro's format:

dataset_name:
  type: pandas.CSVDataset
  filepath: data/raw/dataset.csv
  load_args:
    sep: ","
  save_args:
    index: False

The catalog supports versioning, which automatically timestamps saved datasets and allows loading specific versions. Partitioning enables splitting large datasets into multiple files for parallel processing.

For SQL datasets, the catalog supports dynamic query parameters that can be provided at load time using the query_args keyword arguments.

Thread Safety

The KedroDataCatalog delegates to Kedro's DataCatalog, which is not thread-safe for concurrent writes to the same dataset. Concurrent reads are safe.

References
Examples

Create a catalog using the factory method:

>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("customer_data")
>>> df.columns
Index(['customer_id', 'name', 'email', 'signup_date'], dtype='object')

Save processed data:

>>> processed_df = process_customers(df)
>>> catalog.save("processed_customers", processed_df)

Load data with dynamic SQL query parameters:

>>> # catalog.yml defines: SELECT * FROM sales WHERE year={year} AND region='{region}'
>>> sales = catalog.load("sales_data", year=2024, region="EMEA")
>>> sales.shape
(15420, 8)

Create a catalog with custom config loader:

>>> from kedro.config import OmegaConfigLoader
>>> loader = OmegaConfigLoader(conf_source="config", env="production", base_env="base", default_run_env="local")
>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)

Initialize a new catalog structure:

>>> result = KedroDataCatalog.init_catalog(
...     "./my_project/config", include_globals=True, include_catalog=True, include_credentials=True
... )
>>> print(f"Created: {[f.name for f in result.created_files]}")
Created: ['catalog.yml', 'globals.yml', 'credentials.yml']

KedroDataCatalog( config_path: str | pathlib.Path, config_loader: kedro.config.abstract_config.AbstractConfigLoader | None = None)
214    def __init__(
215        self,
216        config_path: str | Path,
217        config_loader: AbstractConfigLoader | None = None,
218    ) -> None:
219        """
220        Initialize a Kedro data catalog from configuration files.
221
222        This constructor creates a new catalog instance by reading dataset
223        definitions from YAML configuration files in the specified directory.
224        It validates that the required directory structure exists and creates
225        the underlying Kedro DataCatalog.
226
227        Parameters
228        ----------
229        config_path : str or pathlib.Path
230            Path to the configuration directory containing catalog definitions.
231            The directory must contain base/ and local/ subdirectories with
232            the required YAML files (catalog.yml at minimum).
233        config_loader : kedro.config.AbstractConfigLoader or None, default=None
234            Kedro configuration loader for reading YAML files. If None, an
235            OmegaConfigLoader will be created automatically with default settings
236            (base_env="base", default_run_env="local").
237
238        Raises
239        ------
240        FileNotFoundError
241            If the configuration directory does not exist. The error message
242            includes instructions for creating the directory structure using
243            the CLI command or the ``init_catalog`` class method.
244        FileNotFoundError
245            If the catalog structure is incomplete (missing base/ or local/
246            directories or required catalog.yml files). The error message
247            includes instructions for creating the complete structure.
248
249        See Also
250        --------
251        in_directory : Factory method for creating catalog instances.
252        init_catalog : Class method for scaffolding catalog directory structure.
253
254        Notes
255        -----
256        The constructor performs the following steps:
257        1. Validates that the config_path directory exists
258        2. Checks for required catalog structure (base/ and local/ directories)
259        3. Creates or uses the provided config_loader
260        4. Loads catalog configuration and creates the Kedro DataCatalog
261
262        The catalog structure validation requires:
263        - base/ directory with catalog.yml
264        - local/ directory (can be empty initially)
265
266        Credentials are optional and should be placed in local/credentials.yml
267        to prevent accidental commits to version control.
268
269        Examples
270        --------
271        Create a catalog with default configuration loader:
272
273        >>> catalog = KedroDataCatalog("config/catalog")
274        >>> catalog.config_path
275        'config/catalog'
276
277        Create a catalog with custom configuration loader:
278
279        >>> from kedro.config import OmegaConfigLoader
280        >>> loader = OmegaConfigLoader(conf_source="config", env="staging", base_env="base", default_run_env="local")
281        >>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)
282
283        Handle missing configuration directory:
284
285        >>> try:
286        ...     catalog = KedroDataCatalog("nonexistent/path")
287        ... except FileNotFoundError as e:
288        ...     print("Directory not found. Run init_catalog to create it.")
289        Directory not found. Run init_catalog to create it.
290        """
291        self.config_path = str(config_path)
292        path = Path(config_path)
293
294        if not path.exists():
295            raise FileNotFoundError(
296                f"Configuration directory not found: {config_path}\n"
297                f"To create the catalog folder structure, run:\n"
298                f"  adc-toolkit init-catalog {config_path}\n"
299                f"Or use the class method:\n"
300                f"  KedroDataCatalog.init_catalog('{config_path}')"
301            )
302
303        if not catalog_structure_exists(config_path, require_credentials=False):
304            raise FileNotFoundError(
305                f"Catalog structure is incomplete at: {path}\n"
306                f"Missing required files in base/ or local/.\n"
307                f"To create the catalog folder structure, run:\n"
308                f"  adc-toolkit init-catalog {config_path}\n"
309                f"Or use the class method:\n"
310                f"  KedroDataCatalog.init_catalog('{config_path}')"
311            )
312
313        self.config_loader = config_loader
314        if not self.config_loader:
315            self.config_loader = create_omega_config_loader(self.config_path)
316        self._catalog = create_catalog(self.config_loader)

Initialize a Kedro data catalog from configuration files.

This constructor creates a new catalog instance by reading dataset definitions from YAML configuration files in the specified directory. It validates that the required directory structure exists and creates the underlying Kedro DataCatalog.

Parameters
  • config_path (str or pathlib.Path): Path to the configuration directory containing catalog definitions. The directory must contain base/ and local/ subdirectories with the required YAML files (catalog.yml at minimum).
  • config_loader (kedro.config.AbstractConfigLoader or None, default=None): Kedro configuration loader for reading YAML files. If None, an OmegaConfigLoader will be created automatically with default settings (base_env="base", default_run_env="local").
Raises
  • FileNotFoundError: If the configuration directory does not exist. The error message includes instructions for creating the directory structure using the CLI command or the init_catalog class method.
  • FileNotFoundError: If the catalog structure is incomplete (missing base/ or local/ directories or required catalog.yml files). The error message includes instructions for creating the complete structure.
See Also

in_directory: Factory method for creating catalog instances.
init_catalog: Class method for scaffolding catalog directory structure.

Notes

The constructor performs the following steps:

  1. Validates that the config_path directory exists
  2. Checks for required catalog structure (base/ and local/ directories)
  3. Creates or uses the provided config_loader
  4. Loads catalog configuration and creates the Kedro DataCatalog

The catalog structure validation requires:

  • base/ directory with catalog.yml
  • local/ directory (can be empty initially)

Credentials are optional and should be placed in local/credentials.yml to prevent accidental commits to version control.

Examples

Create a catalog with default configuration loader:

>>> catalog = KedroDataCatalog("config/catalog")
>>> catalog.config_path
'config/catalog'

Create a catalog with custom configuration loader:

>>> from kedro.config import OmegaConfigLoader
>>> loader = OmegaConfigLoader(conf_source="config", env="staging", base_env="base", default_run_env="local")
>>> catalog = KedroDataCatalog("config/catalog", config_loader=loader)

Handle missing configuration directory:

>>> try:
...     catalog = KedroDataCatalog("nonexistent/path")
... except FileNotFoundError as e:
...     print("Directory not found. Run init_catalog to create it.")
Directory not found. Run init_catalog to create it.
config_path
config_loader
@classmethod
def in_directory( cls, path: str | pathlib.Path) -> KedroDataCatalog:
318    @classmethod
319    def in_directory(cls, path: str | Path) -> "KedroDataCatalog":
320        """
321        Create a catalog instance from a configuration directory.
322
323        This factory method provides a convenient way to instantiate a
324        KedroDataCatalog by specifying only the configuration directory path.
325        It is the recommended way to create catalog instances in application code.
326
327        The method creates a catalog with default settings, using an automatically
328        configured OmegaConfigLoader to read YAML files from the directory.
329
330        Parameters
331        ----------
332        path : str or pathlib.Path
333            Path to the configuration directory containing catalog definitions.
334            The directory must have the required Kedro catalog structure with
335            base/ and local/ subdirectories.
336
337        Returns
338        -------
339        KedroDataCatalog
340            A new catalog instance configured with datasets from the directory.
341            The catalog is immediately ready to load and save data.
342
343        Raises
344        ------
345        FileNotFoundError
346            If the specified directory does not exist or lacks the required
347            catalog structure (base/ and local/ directories with catalog.yml).
348
349        See Also
350        --------
351        __init__ : Constructor with additional configuration options.
352        init_catalog : Create the catalog directory structure.
353
354        Notes
355        -----
356        This factory method is equivalent to calling the constructor with just
357        the path parameter:
358
359        >>> catalog = KedroDataCatalog(path)
360
361        However, using ``in_directory`` is preferred because:
362        - It matches the DataCatalog protocol interface
363        - It provides better semantic clarity
364        - It enables polymorphism when using multiple catalog implementations
365
366        The method uses default configuration settings:
367        - Base environment: "base"
368        - Default run environment: "local"
369        - Configuration format: YAML
370
371        For advanced configuration needs (custom environments, config merging
372        strategies, runtime parameters), use the constructor directly with a
373        custom AbstractConfigLoader.
374
375        Examples
376        --------
377        Basic usage:
378
379        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
380        >>> df = catalog.load("training_data")
381
382        Using pathlib.Path:
383
384        >>> from pathlib import Path
385        >>> config_dir = Path("config") / "catalog"
386        >>> catalog = KedroDataCatalog.in_directory(config_dir)
387
388        Load and save in a pipeline:
389
390        >>> catalog = KedroDataCatalog.in_directory("./config")
391        >>> raw = catalog.load("raw_data")
392        >>> processed = transform(raw)
393        >>> catalog.save("processed_data", processed)
394
395        Polymorphic usage with DataCatalog protocol:
396
397        >>> def run_pipeline(catalog: DataCatalog) -> None:
398        ...     data = catalog.load("input")
399        ...     result = process(data)
400        ...     catalog.save("output", result)
401        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
402        >>> run_pipeline(catalog)
403        """
404        return cls(path)

Create a catalog instance from a configuration directory.

This factory method provides a convenient way to instantiate a KedroDataCatalog by specifying only the configuration directory path. It is the recommended way to create catalog instances in application code.

The method creates a catalog with default settings, using an automatically configured OmegaConfigLoader to read YAML files from the directory.

Parameters
  • path (str or pathlib.Path): Path to the configuration directory containing catalog definitions. The directory must have the required Kedro catalog structure with base/ and local/ subdirectories.
Returns
  • KedroDataCatalog: A new catalog instance configured with datasets from the directory. The catalog is immediately ready to load and save data.
Raises
  • FileNotFoundError: If the specified directory does not exist or lacks the required catalog structure (base/ and local/ directories with catalog.yml).
See Also

__init__: Constructor with additional configuration options.
init_catalog: Create the catalog directory structure.

Notes

This factory method is equivalent to calling the constructor with just the path parameter:

>>> catalog = KedroDataCatalog(path)

However, using in_directory is preferred because:

  • It matches the DataCatalog protocol interface
  • It provides better semantic clarity
  • It enables polymorphism when using multiple catalog implementations

The method uses default configuration settings:

  • Base environment: "base"
  • Default run environment: "local"
  • Configuration format: YAML

For advanced configuration needs (custom environments, config merging strategies, runtime parameters), use the constructor directly with a custom AbstractConfigLoader.

Examples

Basic usage:

>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("training_data")

Using pathlib.Path:

>>> from pathlib import Path
>>> config_dir = Path("config") / "catalog"
>>> catalog = KedroDataCatalog.in_directory(config_dir)

Load and save in a pipeline:

>>> catalog = KedroDataCatalog.in_directory("./config")
>>> raw = catalog.load("raw_data")
>>> processed = transform(raw)
>>> catalog.save("processed_data", processed)

Polymorphic usage with DataCatalog protocol:

>>> def run_pipeline(catalog: DataCatalog) -> None:
...     data = catalog.load("input")
...     result = process(data)
...     catalog.save("output", result)
>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> run_pipeline(catalog)
@classmethod
def init_catalog( cls, path: str | pathlib.Path, *, overwrite: bool = False, include_globals: bool = True, include_catalog: bool = True, include_credentials: bool = True) -> adc_toolkit.data.catalogs.kedro.scaffold.ScaffoldResult:
406    @classmethod
407    def init_catalog(
408        cls,
409        path: str | Path,
410        *,
411        overwrite: bool = False,
412        include_globals: bool = True,
413        include_catalog: bool = True,
414        include_credentials: bool = True,
415    ) -> ScaffoldResult:
416        """
417        Create Kedro catalog directory structure with template configuration files.
418
419        This class method scaffolds a complete Kedro catalog configuration directory
420        with the required folder structure and template YAML files. It is intended
421        for initializing new projects or adding catalog functionality to existing
422        projects.
423
424        The method creates a directory structure following Kedro conventions:
425
426        - path/base/catalog.yml: Base dataset definitions
427        - path/base/globals.yml: Global variables and parameters
428        - path/local/catalog.yml: Local environment overrides
429        - path/local/credentials.yml: Credentials (should be gitignored)
430
431        Template files include helpful comments and examples to guide configuration.
432
433        Parameters
434        ----------
435        path : str or pathlib.Path
436            Root path for the configuration directory. The directory will be
437            created if it doesn't exist. Subdirectories base/ and local/ will
438            be created within this path.
439        overwrite : bool, default=False
440            If True, overwrite existing files at the destination paths. If False,
441            existing files are preserved and reported in skipped_files. Use with
442            caution to avoid losing custom configurations.
443        include_globals : bool, default=True
444            If True, create base/globals.yml with template global variables.
445            Global variables can be referenced in catalog.yml using ${variable}
446            syntax for parameterization.
447        include_catalog : bool, default=True
448            If True, create base/catalog.yml and local/catalog.yml with template
449            dataset definitions. These files are essential for catalog operation.
450        include_credentials : bool, default=True
451            If True, create local/credentials.yml for storing credentials. This
452            file should be added to .gitignore to prevent committing secrets.
453
454        Returns
455        -------
456        ScaffoldResult
457            A result object containing:
458            - created_files: List of Path objects for files that were created
459            - skipped_files: List of Path objects for files that already existed
460            - created_directories: List of Path objects for directories created
461
462        See Also
463        --------
464        in_directory : Factory method to create catalog from existing config.
465        adc_toolkit.data.catalogs.kedro.scaffold.create_catalog_folder_structure :
466            Underlying scaffolding function.
467
468        Notes
469        -----
470        The scaffolded directory structure follows Kedro best practices:
471
472        - **base/**: Contains base configurations shared across environments
473        - **local/**: Contains local overrides and credentials (gitignored)
474
475        Template catalog.yml includes examples for common dataset types:
476        - CSV files with pandas.CSVDataset
477        - Parquet files with pandas.ParquetDataset
478        - Excel files with pandas.ExcelDataset
479        - Pickle files with pickle.PickleDataset
480
481        Template globals.yml includes examples for:
482        - Base paths (data directories)
483        - Common parameters (date formats, separators)
484        - Environment-specific settings
485
486        After running this method, you should:
487        1. Review and customize the generated YAML files
488        2. Add local/credentials.yml to .gitignore
489        3. Define your project-specific datasets in base/catalog.yml
490        4. Add environment-specific overrides in local/catalog.yml
491
492        This is equivalent to running the CLI command:
493        ``adc-toolkit init-catalog <path>``
494
495        Examples
496        --------
497        Initialize a catalog in a new project:
498
499        >>> result = KedroDataCatalog.init_catalog("./config/catalog")
500        >>> print(f"Created {len(result.created_files)} files")
501        Created 4 files
502        >>> print(f"Directories: {[d.name for d in result.created_directories]}")
503        Directories: ['base', 'local']
504
505        Initialize with selective templates:
506
507        >>> result = KedroDataCatalog.init_catalog(
508        ...     "./config/catalog",
509        ...     include_globals=True,
510        ...     include_catalog=True,
511        ...     include_credentials=False,  # No credentials needed
512        ... )
513
514        Reinitialize with overwrite (use carefully):
515
516        >>> result = KedroDataCatalog.init_catalog(
517        ...     "./config/catalog",
518        ...     overwrite=True,  # Overwrites existing files
519        ... )
520
521        Check what was created vs. skipped:
522
523        >>> result = KedroDataCatalog.init_catalog("./config/catalog")
524        >>> if result.skipped_files:
525        ...     print(f"Skipped existing: {[f.name for f in result.skipped_files]}")
526        ... if result.created_files:
527        ...     print(f"Created new: {[f.name for f in result.created_files]}")
528
529        Use in project setup script:
530
531        >>> from pathlib import Path
532        >>> project_root = Path("./my_project")
533        >>> catalog_dir = project_root / "config" / "catalog"
534        >>> result = KedroDataCatalog.init_catalog(catalog_dir)
535        >>> assert catalog_dir.exists()
536        >>> assert (catalog_dir / "base" / "catalog.yml").exists()
537        """
538        return create_catalog_folder_structure(
539            path,
540            overwrite=overwrite,
541            include_globals=include_globals,
542            include_catalog=include_catalog,
543            include_credentials=include_credentials,
544        )

Create Kedro catalog directory structure with template configuration files.

This class method scaffolds a complete Kedro catalog configuration directory with the required folder structure and template YAML files. It is intended for initializing new projects or adding catalog functionality to existing projects.

The method creates a directory structure following Kedro conventions:

  • path/base/catalog.yml: Base dataset definitions
  • path/base/globals.yml: Global variables and parameters
  • path/local/catalog.yml: Local environment overrides
  • path/local/credentials.yml: Credentials (should be gitignored)

Template files include helpful comments and examples to guide configuration.

Parameters
  • path (str or pathlib.Path): Root path for the configuration directory. The directory will be created if it doesn't exist. Subdirectories base/ and local/ will be created within this path.
  • overwrite (bool, default=False): If True, overwrite existing files at the destination paths. If False, existing files are preserved and reported in skipped_files. Use with caution to avoid losing custom configurations.
  • include_globals (bool, default=True): If True, create base/globals.yml with template global variables. Global variables can be referenced in catalog.yml using ${variable} syntax for parameterization.
  • include_catalog (bool, default=True): If True, create base/catalog.yml and local/catalog.yml with template dataset definitions. These files are essential for catalog operation.
  • include_credentials (bool, default=True): If True, create local/credentials.yml for storing credentials. This file should be added to .gitignore to prevent committing secrets.
Returns
  • ScaffoldResult: A result object containing:
    • created_files: List of Path objects for files that were created
    • skipped_files: List of Path objects for files that already existed
    • created_directories: List of Path objects for directories created
See Also

in_directory: Factory method to create catalog from existing config.
adc_toolkit.data.catalogs.kedro.scaffold.create_catalog_folder_structure: Underlying scaffolding function.

Notes

The scaffolded directory structure follows Kedro best practices:

  • base/: Contains base configurations shared across environments
  • local/: Contains local overrides and credentials (gitignored)

Template catalog.yml includes examples for common dataset types:

  • CSV files with pandas.CSVDataset
  • Parquet files with pandas.ParquetDataset
  • Excel files with pandas.ExcelDataset
  • Pickle files with pickle.PickleDataset

Template globals.yml includes examples for:

  • Base paths (data directories)
  • Common parameters (date formats, separators)
  • Environment-specific settings

After running this method, you should:

  1. Review and customize the generated YAML files
  2. Add local/credentials.yml to .gitignore
  3. Define your project-specific datasets in base/catalog.yml
  4. Add environment-specific overrides in local/catalog.yml

This is equivalent to running the CLI command: adc-toolkit init-catalog <path>

Examples

Initialize a catalog in a new project:

>>> result = KedroDataCatalog.init_catalog("./config/catalog")
>>> print(f"Created {len(result.created_files)} files")
Created 4 files
>>> print(f"Directories: {[d.name for d in result.created_directories]}")
Directories: ['base', 'local']

Initialize with selective templates:

>>> result = KedroDataCatalog.init_catalog(
...     "./config/catalog",
...     include_globals=True,
...     include_catalog=True,
...     include_credentials=False,  # No credentials needed
... )

Reinitialize with overwrite (use carefully):

>>> result = KedroDataCatalog.init_catalog(
...     "./config/catalog",
...     overwrite=True,  # Overwrites existing files
... )

Check what was created vs. skipped:

>>> result = KedroDataCatalog.init_catalog("./config/catalog")
>>> if result.skipped_files:
...     print(f"Skipped existing: {[f.name for f in result.skipped_files]}")
... if result.created_files:
...     print(f"Created new: {[f.name for f in result.created_files]}")

Use in project setup script:

>>> from pathlib import Path
>>> project_root = Path("./my_project")
>>> catalog_dir = project_root / "config" / "catalog"
>>> result = KedroDataCatalog.init_catalog(catalog_dir)
>>> assert catalog_dir.exists()
>>> assert (catalog_dir / "base" / "catalog.yml").exists()
def load(self, name: str, **query_args: Any) -> adc_toolkit.data.abs.Data:
546    def load(self, name: str, **query_args: Any) -> Data:
547        """
548        Load a dataset by name from the catalog.
549
550        Retrieve a dataset using its registered name as defined in the catalog
551        configuration files. The method handles all I/O operations, file format
552        parsing, and type conversions based on the dataset's configuration.
553
554        For SQL-based datasets with parameterized queries, this method supports
555        dynamic query parameter substitution through keyword arguments, enabling
556        flexible data filtering and selection at load time.
557
558        Parameters
559        ----------
560        name : str
561            The registered name of the dataset to load. This name must match
562            a dataset definition in the catalog configuration files (catalog.yml).
563        **query_args : Any
564            Keyword arguments for dynamic SQL query parameterization. Only
565            applicable to SQL-based datasets with parameterized queries using
566            Python format string syntax (e.g., WHERE year={year}). For non-SQL
567            datasets, query_args are ignored.
568
569        Returns
570        -------
571        Data
572            The loaded dataset as a Data protocol-compatible object. The specific
573            type depends on the dataset configuration (typically pandas.DataFrame,
574            but can be Spark DataFrame, numpy array, or other types defined in
575            the catalog).
576
577        Raises
578        ------
579        KeyError
580            If no dataset with the given name is registered in the catalog.
581        FileNotFoundError
582            If the dataset's source file or database does not exist.
583        ValueError
584            If the dataset cannot be loaded due to format errors, parsing errors,
585            or invalid query_args for datasets that don't support queries.
586        PermissionError
587            If the dataset's source file or database is not readable.
588
589        See Also
590        --------
591        save : Save a dataset to the catalog.
592        _load_with_dynamic_query : Internal method for parameterized SQL queries.
593
594        Notes
595        -----
596        Load behavior depends on the dataset type configured in catalog.yml:
597
598        - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5):
599          Reads from the configured filepath using the specified load_args.
600
601        - **Database datasets** (SQL, SQLQuery, SQLTable): Executes queries
602          or reads tables from the configured database connection.
603
604        - **Versioned datasets**: Loads the latest version unless a specific
605          version is requested in the configuration.
606
607        - **Partitioned datasets**: Loads and concatenates all partitions.
608
609        For SQL datasets with dynamic queries, the query string in catalog.yml
610        should use Python format string syntax with named placeholders:
611
612        .. code-block:: yaml
613
614            sales_data:
615              type: pandas.SQLQueryDataset
616              sql: SELECT * FROM sales WHERE year={year} AND region='{region}'
617              credentials: database_creds
618
619        Query parameters are substituted at load time:
620
621        >>> df = catalog.load("sales_data", year=2024, region="EMEA")
622
623        The load operation is idempotent: calling it multiple times with the
624        same parameters returns equivalent data (though not necessarily the
625        same object instance).
626
627        Performance Considerations
628        --------------------------
629        - Large datasets may take significant time and memory to load
630        - For large files, consider using chunking or lazy loading
631        - Cloud storage (S3, GCS, Azure) may incur network latency
632        - Partitioned datasets are loaded in parallel when possible
633
634        Examples
635        --------
636        Load a simple dataset:
637
638        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
639        >>> df = catalog.load("customer_data")
640        >>> df.shape
641        (10000, 8)
642
643        Load with dynamic SQL query parameters:
644
645        >>> # catalog.yml: sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
646        >>> sales_2024 = catalog.load("sales_data", year=2024, region="EMEA")
647        >>> sales_2024["year"].unique()
648        array([2024])
649        >>> sales_2024["region"].unique()
650        array(['EMEA'], dtype=object)
651
652        Load multiple datasets in sequence:
653
654        >>> raw = catalog.load("raw_data")
655        >>> features = catalog.load("feature_set")
656        >>> model = catalog.load("trained_model")
657
658        Handle missing datasets gracefully:
659
660        >>> try:
661        ...     data = catalog.load("nonexistent_dataset")
662        ... except KeyError as e:
663        ...     print(f"Dataset not found: {e}")
664        ...     # Use default data or prompt user
665        Dataset not found: 'nonexistent_dataset'
666
667        Load with different query parameters:
668
669        >>> q1_data = catalog.load("sales_data", quarter=1, year=2024)
670        >>> q2_data = catalog.load("sales_data", quarter=2, year=2024)
671        """
672        if query_args:
673            return self._load_with_dynamic_query(name, **query_args)
674        return self._catalog.load(name)

Load a dataset by name from the catalog.

Retrieve a dataset using its registered name as defined in the catalog configuration files. The method handles all I/O operations, file format parsing, and type conversions based on the dataset's configuration.

For SQL-based datasets with parameterized queries, this method supports dynamic query parameter substitution through keyword arguments, enabling flexible data filtering and selection at load time.

Parameters
  • name (str): The registered name of the dataset to load. This name must match a dataset definition in the catalog configuration files (catalog.yml).
  • **query_args (Any): Keyword arguments for dynamic SQL query parameterization. Only applicable to SQL-based datasets with parameterized queries using Python format string syntax (e.g., WHERE year={year}). For non-SQL datasets, query_args are ignored.
Returns
  • Data: The loaded dataset as a Data protocol-compatible object. The specific type depends on the dataset configuration (typically pandas.DataFrame, but can be Spark DataFrame, numpy array, or other types defined in the catalog).
Raises
  • KeyError: If no dataset with the given name is registered in the catalog.
  • FileNotFoundError: If the dataset's source file or database does not exist.
  • ValueError: If the dataset cannot be loaded due to format errors, parsing errors, or invalid query_args for datasets that don't support queries.
  • PermissionError: If the dataset's source file or database is not readable.
See Also

save: Save a dataset to the catalog.
_load_with_dynamic_query: Internal method for parameterized SQL queries.

Notes

Load behavior depends on the dataset type configured in catalog.yml:

  • File-based datasets (CSV, Parquet, JSON, Excel, Pickle, HDF5): Reads from the configured filepath using the specified load_args.

  • Database datasets (SQL, SQLQuery, SQLTable): Executes queries or reads tables from the configured database connection.

  • Versioned datasets: Loads the latest version unless a specific version is requested in the configuration.

  • Partitioned datasets: Loads and concatenates all partitions.

For SQL datasets with dynamic queries, the query string in catalog.yml should use Python format string syntax with named placeholders:

sales_data:
  type: pandas.SQLQueryDataset
  sql: SELECT * FROM sales WHERE year={year} AND region='{region}'
  credentials: database_creds

Query parameters are substituted at load time:

>>> df = catalog.load("sales_data", year=2024, region="EMEA")

The load operation is idempotent: calling it multiple times with the same parameters returns equivalent data (though not necessarily the same object instance).

Performance Considerations
  • Large datasets may take significant time and memory to load
  • For large files, consider using chunking or lazy loading
  • Cloud storage (S3, GCS, Azure) may incur network latency
  • Partitioned datasets are loaded in parallel when possible
Examples

Load a simple dataset:

>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> df = catalog.load("customer_data")
>>> df.shape
(10000, 8)

Load with dynamic SQL query parameters:

>>> # catalog.yml: sql: "SELECT * FROM sales WHERE year={year} AND region='{region}'"
>>> sales_2024 = catalog.load("sales_data", year=2024, region="EMEA")
>>> sales_2024["year"].unique()
array([2024])
>>> sales_2024["region"].unique()
array(['EMEA'], dtype=object)

Load multiple datasets in sequence:

>>> raw = catalog.load("raw_data")
>>> features = catalog.load("feature_set")
>>> model = catalog.load("trained_model")

Handle missing datasets gracefully:

>>> try:
...     data = catalog.load("nonexistent_dataset")
... except KeyError as e:
...     print(f"Dataset not found: {e}")
...     # Use default data or prompt user
Dataset not found: 'nonexistent_dataset'

Load with different query parameters:

>>> q1_data = catalog.load("sales_data", quarter=1, year=2024)
>>> q2_data = catalog.load("sales_data", quarter=2, year=2024)
def save(self, name: str, data: adc_toolkit.data.abs.Data) -> None:
676    def save(self, name: str, data: Data) -> None:
677        """
678        Save a dataset by name to the catalog.
679
680        Store a dataset using its registered name as defined in the catalog
681        configuration files. The method handles all I/O operations, file format
682        serialization, and storage operations based on the dataset's configuration.
683
684        Parameters
685        ----------
686        name : str
687            The registered name of the dataset to save. This name must match
688            a dataset definition in the catalog configuration files (catalog.yml).
689            The dataset configuration determines the output location, format,
690            and serialization parameters.
691        data : Data
692            The dataset to save. Must be a Data protocol-compatible object
693            (e.g., pandas.DataFrame, Spark DataFrame) that is compatible with
694            the dataset type specified in the catalog configuration.
695
696        Returns
697        -------
698        None
699
700        Raises
701        ------
702        KeyError
703            If no dataset with the given name is registered in the catalog.
704        TypeError
705            If the data type is incompatible with the dataset configuration
706            (e.g., attempting to save a DataFrame to a PickleDataset expecting
707            a different object type).
708        ValueError
709            If the dataset cannot be saved due to validation errors, format
710            incompatibilities, or invalid configuration.
711        PermissionError
712            If the target location is not writable due to filesystem permissions.
713        OSError
714            If disk space is insufficient or other I/O errors occur during save.
715
716        See Also
717        --------
718        load : Load a dataset from the catalog.
719
720        Notes
721        -----
722        Save behavior depends on the dataset type configured in catalog.yml:
723
724        - **File-based datasets** (CSV, Parquet, JSON, Excel, Pickle, HDF5):
725          Writes to the configured filepath using the specified save_args.
726          Parent directories are created automatically if they don't exist.
727
728        - **Database datasets** (SQL, SQLTable): Writes to the configured
729          database table using the specified connection and save parameters.
730
731        - **Versioned datasets**: Creates a new timestamped version rather
732          than overwriting. Version format: filepath/YYYY-MM-DDTHH.mm.ss.sssZ/
733
734        - **Partitioned datasets**: Splits data across multiple files based
735          on partitioning configuration.
736
737        The save operation typically overwrites existing data at the target
738        location unless versioning is enabled. For versioned datasets, each
739        save creates a new version without removing previous versions.
740
741        Format-Specific Behavior
742        ------------------------
743        Different formats have different save characteristics:
744
745        - **CSV**: Human-readable, widely compatible, larger file size
746        - **Parquet**: Columnar format, compressed, efficient for analytics
747        - **Pickle**: Python-specific, preserves exact objects, version-sensitive
748        - **JSON**: Human-readable, good for nested structures
749        - **HDF5**: Binary format, good for large numerical arrays
750
751        Atomicity and Error Handling
752        -----------------------------
753        The atomicity of save operations depends on the underlying dataset type
754        and storage backend:
755
756        - Local filesystem writes may be atomic for small files
757        - Cloud storage (S3, GCS, Azure) uses multi-part uploads
758        - Database writes depend on transaction support
759        - Partitioned saves may be partially successful
760
761        If a save operation fails partway through, partial data may be written.
762        For critical applications, consider implementing save-to-temporary-then-move
763        patterns or using versioned datasets.
764
765        Performance Considerations
766        --------------------------
767        - Large datasets may take significant time to serialize and write
768        - Cloud storage uploads may have network latency and bandwidth limits
769        - Compression (enabled in save_args) trades CPU time for disk space
770        - Partitioned datasets can write partitions in parallel
771
772        Examples
773        --------
774        Save a processed dataset:
775
776        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
777        >>> processed_df = process_data(raw_df)
778        >>> catalog.save("processed_data", processed_df)
779
780        Save multiple datasets in a pipeline:
781
782        >>> catalog = KedroDataCatalog.in_directory("config/catalog")
783        >>> raw = catalog.load("raw_data")
784        >>> cleaned = clean_data(raw)
785        >>> catalog.save("cleaned_data", cleaned)
786        >>> features = engineer_features(cleaned)
787        >>> catalog.save("features", features)
788        >>> predictions = model.predict(features)
789        >>> catalog.save("predictions", predictions)
790
791        Save with versioning (configured in catalog.yml):
792
793        >>> # catalog.yml:
794        >>> #   versioned_output:
795        >>> #     type: pandas.CSVDataset
796        >>> #     filepath: data/output.csv
797        >>> #     versioned: true
798        >>> catalog.save("versioned_output", result_df)
799        >>> # Saves to: data/output.csv/2024-01-15T10.30.45.123Z/output.csv
800
801        Handle save errors:
802
803        >>> try:
804        ...     catalog.save("output_data", large_df)
805        ... except PermissionError as e:
806        ...     print(f"Cannot write to output location: {e}")
807        ... except OSError as e:
808        ...     print(f"I/O error during save: {e}")
809
810        Save to different formats (configured per dataset):
811
812        >>> # Same data, different formats for different use cases
813        >>> catalog.save("output_csv", df)  # Human-readable
814        >>> catalog.save("output_parquet", df)  # Efficient storage
815        >>> catalog.save("output_json", df)  # API consumption
816        """
817        self._catalog.save(name, data)

Save a dataset by name to the catalog.

Store a dataset using its registered name as defined in the catalog configuration files. The method handles all I/O operations, file format serialization, and storage operations based on the dataset's configuration.

Parameters
  • name (str): The registered name of the dataset to save. This name must match a dataset definition in the catalog configuration files (catalog.yml). The dataset configuration determines the output location, format, and serialization parameters.
  • data (Data): The dataset to save. Must be a Data protocol-compatible object (e.g., pandas.DataFrame, Spark DataFrame) that is compatible with the dataset type specified in the catalog configuration.
Returns
  • None
Raises
  • KeyError: If no dataset with the given name is registered in the catalog.
  • TypeError: If the data type is incompatible with the dataset configuration (e.g., attempting to save a DataFrame to a PickleDataset expecting a different object type).
  • ValueError: If the dataset cannot be saved due to validation errors, format incompatibilities, or invalid configuration.
  • PermissionError: If the target location is not writable due to filesystem permissions.
  • OSError: If disk space is insufficient or other I/O errors occur during save.
See Also

load: Load a dataset from the catalog.

Notes

Save behavior depends on the dataset type configured in catalog.yml:

  • File-based datasets (CSV, Parquet, JSON, Excel, Pickle, HDF5): Writes to the configured filepath using the specified save_args. Parent directories are created automatically if they don't exist.

  • Database datasets (SQL, SQLTable): Writes to the configured database table using the specified connection and save parameters.

  • Versioned datasets: Creates a new timestamped version rather than overwriting. Version format: filepath/YYYY-MM-DDTHH.mm.ss.sssZ/

  • Partitioned datasets: Splits data across multiple files based on partitioning configuration.

The save operation typically overwrites existing data at the target location unless versioning is enabled. For versioned datasets, each save creates a new version without removing previous versions.

Format-Specific Behavior

Different formats have different save characteristics:

  • CSV: Human-readable, widely compatible, larger file size
  • Parquet: Columnar format, compressed, efficient for analytics
  • Pickle: Python-specific, preserves exact objects, version-sensitive
  • JSON: Human-readable, good for nested structures
  • HDF5: Binary format, good for large numerical arrays
Atomicity and Error Handling

The atomicity of save operations depends on the underlying dataset type and storage backend:

  • Local filesystem writes may be atomic for small files
  • Cloud storage (S3, GCS, Azure) uses multi-part uploads
  • Database writes depend on transaction support
  • Partitioned saves may be partially successful

If a save operation fails partway through, partial data may be written. For critical applications, consider implementing save-to-temporary-then-move patterns or using versioned datasets.

Performance Considerations
  • Large datasets may take significant time to serialize and write
  • Cloud storage uploads may have network latency and bandwidth limits
  • Compression (enabled in save_args) trades CPU time for disk space
  • Partitioned datasets can write partitions in parallel
Examples

Save a processed dataset:

>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> processed_df = process_data(raw_df)
>>> catalog.save("processed_data", processed_df)

Save multiple datasets in a pipeline:

>>> catalog = KedroDataCatalog.in_directory("config/catalog")
>>> raw = catalog.load("raw_data")
>>> cleaned = clean_data(raw)
>>> catalog.save("cleaned_data", cleaned)
>>> features = engineer_features(cleaned)
>>> catalog.save("features", features)
>>> predictions = model.predict(features)
>>> catalog.save("predictions", predictions)

Save with versioning (configured in catalog.yml):

>>> # catalog.yml:
>>> #   versioned_output:
>>> #     type: pandas.CSVDataset
>>> #     filepath: data/output.csv
>>> #     versioned: true
>>> catalog.save("versioned_output", result_df)
>>> # Saves to: data/output.csv/2024-01-15T10.30.45.123Z/output.csv

Handle save errors:

>>> try:
...     catalog.save("output_data", large_df)
... except PermissionError as e:
...     print(f"Cannot write to output location: {e}")
... except OSError as e:
...     print(f"I/O error during save: {e}")

Save to different formats (configured per dataset):

>>> # Same data, different formats for different use cases
>>> catalog.save("output_csv", df)  # Human-readable
>>> catalog.save("output_parquet", df)  # Efficient storage
>>> catalog.save("output_json", df)  # API consumption