Source code for impact_engine_measure.storage.manager

"""
Storage Manager for coordinating storage operations.

Design: Uses dependency injection to receive storage adapter from factory.
This decouples coordination logic from adapter selection, enabling:
- Easy unit testing with mock adapters
- Adapter selection controlled by configuration, not hardcoded
"""

from typing import Any, Dict, Optional

import pandas as pd

from .base import StorageInterface


[docs] class StorageManager: """Central coordinator for storage management. Uses dependency injection - the storage adapter is passed in via constructor, making the manager easy to test with mock implementations. """
[docs] def __init__( self, storage_config: Dict[str, Any], adapter: StorageInterface, ): """Initialize the StorageManager with injected storage adapter. Parameters ---------- storage_config : dict Storage configuration (storage_url, prefix, etc.). adapter : StorageInterface The storage implementation to use for persistence. """ self.storage_config = storage_config self.adapter = adapter # Connect the injected adapter if not self.adapter.connect(storage_config): raise ConnectionError("Failed to connect to storage")
[docs] def write_json(self, path: str, data: Dict[str, Any]) -> None: """Write JSON data to storage. Parameters ---------- path : str Relative path within the storage location. data : dict Dictionary to serialize as JSON. """ self.adapter.write_json(path, data)
[docs] def write_csv(self, path: str, df: pd.DataFrame) -> None: """Write DataFrame to CSV in storage. Parameters ---------- path : str Relative path within the storage location. df : pd.DataFrame DataFrame to write. """ self.adapter.write_csv(path, df)
[docs] def write_yaml(self, path: str, data: Dict[str, Any]) -> None: """Write YAML data to storage. Parameters ---------- path : str Relative path within the storage location. data : dict Dictionary to serialize as YAML. """ self.adapter.write_yaml(path, data)
[docs] def write_parquet(self, path: str, df: pd.DataFrame) -> None: """Write DataFrame to Parquet in storage. Parameters ---------- path : str Relative path within the storage location. df : pd.DataFrame DataFrame to write. """ self.adapter.write_parquet(path, df)
[docs] def full_path(self, path: str) -> str: """Get the full path/URL for a relative path. Parameters ---------- path : str Relative path within the storage location. Returns ------- str Full path or URL to the resource. """ return self.adapter.full_path(path)
[docs] def get_current_config(self) -> Optional[Dict[str, Any]]: """Get the currently loaded configuration.""" return self.storage_config
[docs] def get_job(self) -> Any: """Get the underlying job object for artifact management. This is used for creating nested jobs (e.g., in metrics adapters). Returns ------- Any Job object or None if the adapter doesn't support jobs. """ return self.adapter.get_job()