"""
File Adapter - reads metrics data from CSV or Parquet files.
This adapter enables file-based workflows where upstream processes
produce data files that impact-engine consumes.
"""
import logging
from typing import Any, Dict, Optional
import pandas as pd
from artifact_store import ArtifactStore
from ..base import MetricsInterface
from ..factory import METRICS_REGISTRY
[docs]
@METRICS_REGISTRY.register_decorator("file")
class FileAdapter(MetricsInterface):
"""Adapter for file-based data sources that implements MetricsInterface.
Supports CSV and Parquet file formats, including partitioned Parquet directories.
The file is expected to contain pre-processed data ready for impact analysis.
Configuration::
DATA:
SOURCE:
type: file
CONFIG:
path: path/to/data.csv # Single CSV file
path: path/to/data.parquet # Single Parquet file
path: path/to/partitioned_data/ # Partitioned Parquet directory
# Optional parameters:
date_column: date # Column name for date filtering
product_id_column: product_id # Column name for product IDs
"""
[docs]
def __init__(self):
"""Initialize the FileAdapter."""
self.logger = logging.getLogger(__name__)
self.is_connected = False
self.config: Optional[Dict[str, Any]] = None
self.data: Optional[pd.DataFrame] = None
self.store: Optional[ArtifactStore] = None
self.filename: Optional[str] = None
[docs]
def connect(self, config: Dict[str, Any]) -> bool:
"""Initialize adapter with configuration parameters.
Parameters
----------
config : dict
Dictionary containing (lowercase keys, merged via process_config):
- path: Path to the data file (required)
- date_column: Column name for dates (optional)
- product_id_column: Column name for product IDs (optional, default: product_id)
Returns
-------
bool
True if initialization successful.
Raises
------
ValueError
If required configuration is missing.
FileNotFoundError
If the specified file doesn't exist.
"""
path = config.get("path")
if not path:
raise ValueError("'path' is required in file adapter configuration")
# Use artifact store for cloud compatibility (supports local and S3 paths)
self.store, self.filename = ArtifactStore.from_file_path(path)
if not self.store.exists(self.filename):
raise FileNotFoundError(f"Data file not found: {path}")
self.config = {
"path": path,
"date_column": config.get("date_column"),
"product_id_column": config.get("product_id_column", "product_id"),
}
# Pre-load data for validation
self._load_data()
self.is_connected = True
self.logger.info(f"Connected to file source: {path}")
return True
def _load_data(self) -> pd.DataFrame:
"""Load data from file (CSV or Parquet).
Returns
-------
pd.DataFrame
DataFrame with loaded data.
Raises
------
ValueError
If file format is not supported.
"""
path = self.config["path"]
self.data = self.store.read_data(self.filename)
self.logger.info(f"Loaded {len(self.data)} rows from {path}")
return self.data
[docs]
def retrieve_business_metrics(self, products: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame:
"""Retrieve business metrics from the loaded file.
For file-based sources, the data is already loaded. This method
optionally filters by date range and product IDs if configured.
Parameters
----------
products : pd.DataFrame
DataFrame with product identifiers (can be empty for file sources).
start_date : str
Start date in YYYY-MM-DD format (used if DATE_COLUMN configured).
end_date : str
End date in YYYY-MM-DD format (used if DATE_COLUMN configured).
Returns
-------
pd.DataFrame
DataFrame with business metrics.
Raises
------
ConnectionError
If adapter not connected.
"""
if not self.is_connected:
raise ConnectionError("Not connected to file source. Call connect() first.")
result = self.data.copy()
# Filter by date if date column is configured
date_col = self.config.get("date_column")
if date_col and date_col in result.columns:
result[date_col] = pd.to_datetime(result[date_col])
start = pd.to_datetime(start_date)
end = pd.to_datetime(end_date)
result = result[(result[date_col] >= start) & (result[date_col] <= end)]
self.logger.info(f"Filtered to {len(result)} rows for date range {start_date} to {end_date}")
# Filter by products if provided and product_id column exists
if products is not None and len(products) > 0:
id_col = self.config.get("product_id_column", "product_id")
if id_col in result.columns and "product_id" in products.columns:
product_ids = set(products["product_id"].tolist())
result = result[result[id_col].isin(product_ids)]
self.logger.info(f"Filtered to {len(result)} rows for {len(product_ids)} products")
return self.transform_inbound(result)
[docs]
def validate_connection(self) -> bool:
"""Validate that the file source is accessible.
Returns
-------
bool
True if file exists and data is loaded.
"""
if not self.is_connected or self.config is None or self.store is None:
return False
return self.store.exists(self.filename) and self.data is not None