Source code for impact_engine_measure.metrics.manager

"""
Metrics Manager for coordinating metrics operations.

Design: Uses dependency injection to receive metrics adapter from factory.
This decouples coordination logic from adapter selection, enabling:
- Easy unit testing with mock adapters
- Adapter selection controlled by configuration, not hardcoded
"""

from datetime import datetime
from typing import Any, Dict, Optional

import pandas as pd
from artifact_store import JobInfo

from .base import MetricsInterface


[docs] class MetricsManager: """Central coordinator for metrics management. Uses dependency injection - the metrics source is passed in via constructor, making the manager easy to test with mock implementations. Note: source_config is expected to be pre-validated via process_config(). """
[docs] def __init__( self, source_config: Dict[str, Any], metrics_source: MetricsInterface, source_type: str, parent_job: Optional[JobInfo] = None, ): """Initialize the MetricsManager with injected metrics source. Parameters ---------- source_config : dict SOURCE.CONFIG configuration block (pre-validated, with defaults merged). metrics_source : MetricsInterface The metrics implementation to use for data retrieval. source_type : str The type of metrics source (e.g., "simulator", "file"). parent_job : JobInfo, optional Optional parent job for artifact management. """ self.source_config = source_config self.metrics_source = metrics_source self.source_type = source_type self.parent_job = parent_job # Connect the injected metrics source connection_config = self._build_connection_config() if not self.metrics_source.connect(connection_config): raise ConnectionError("Failed to connect to metrics source")
def _build_connection_config(self) -> Dict[str, Any]: """Build connection configuration from SOURCE.CONFIG. Config is pre-validated with defaults merged, so direct access is safe. Passes through all source config to support different adapter types. """ # Start with full source config to support all adapter types (file, simulator, etc.) config = dict(self.source_config) # Add parent_job for artifact management config["parent_job"] = self.parent_job return config
[docs] def retrieve_metrics(self, products: pd.DataFrame) -> pd.DataFrame: """Retrieve business metrics for specified products using SOURCE.CONFIG date range.""" if products is None or len(products) == 0: raise ValueError("Products DataFrame cannot be empty") # Get date range from SOURCE.CONFIG start_date = self.source_config["start_date"] end_date = self.source_config["end_date"] result = self.metrics_source.retrieve_business_metrics( products=products, start_date=start_date, end_date=end_date ) # Add metadata fields (centralized here instead of in each adapter) result["metrics_source"] = self.source_type result["retrieval_timestamp"] = datetime.now() return result
[docs] def get_current_config(self) -> Optional[Dict[str, Any]]: """Get the currently loaded configuration.""" return self.source_config