Source code for impact_engine_measure.storage.base

"""Base interfaces and common classes for the storage layer."""

from abc import ABC, abstractmethod
from typing import Any, Dict

import pandas as pd


[docs] class StorageInterface(ABC): """Abstract base class defining the contract for all storage implementations. Required methods (must override): - connect: Initialize adapter with configuration - write_json: Write JSON data to storage - write_csv: Write DataFrame to CSV - write_yaml: Write YAML data to storage - write_parquet: Write DataFrame to Parquet - full_path: Get full path/URL for a relative path Optional methods (have sensible defaults): - validate_connection: Check if connection is active """
[docs] @abstractmethod def connect(self, config: Dict[str, Any]) -> bool: """Initialize storage with configuration. Parameters ---------- config : dict Dictionary containing storage configuration (e.g., storage_url, prefix). Returns ------- bool True if initialization successful, False otherwise. """ pass
[docs] @abstractmethod def write_json(self, path: str, data: Dict[str, Any]) -> None: """Write JSON data to storage. Parameters ---------- path : str Relative path within the storage location. data : dict Dictionary to serialize as JSON. """ pass
[docs] @abstractmethod def write_csv(self, path: str, df: pd.DataFrame) -> None: """Write DataFrame to CSV in storage. Parameters ---------- path : str Relative path within the storage location. df : pd.DataFrame DataFrame to write. """ pass
[docs] @abstractmethod def write_yaml(self, path: str, data: Dict[str, Any]) -> None: """Write YAML data to storage. Parameters ---------- path : str Relative path within the storage location. data : dict Dictionary to serialize as YAML. """ pass
[docs] @abstractmethod def write_parquet(self, path: str, df: pd.DataFrame) -> None: """Write DataFrame to Parquet in storage. Parameters ---------- path : str Relative path within the storage location. df : pd.DataFrame DataFrame to write. """ pass
[docs] @abstractmethod def full_path(self, path: str) -> str: """Get the full path/URL for a relative path. Parameters ---------- path : str Relative path within the storage location. Returns ------- str Full path or URL to the resource. """ pass
[docs] def validate_connection(self) -> bool: """Validate that the storage connection is active and functional. Default implementation returns True. Override for custom validation. Returns ------- bool True if connection is valid, False otherwise. """ return True
[docs] def get_job(self) -> Any: """Get the underlying job object for artifact management. This is used for creating nested jobs or accessing job metadata. Default implementation returns None. Override for adapters that support job-based artifact management. Returns ------- Any Job object or None if not applicable. """ return None