"""
Catalog Simulator Adapter - adapts online_retail_simulator package to MetricsInterface.
Integration is governed by contracts (schemas) and config bridge (config translation).
"""
from typing import Any, Dict, Optional
import pandas as pd
from artifact_store import JobInfo, create_job
from ...core import ConfigBridge, MetricsSchema, ProductSchema
from ..base import MetricsInterface
from ..factory import METRICS_REGISTRY
[docs]
@METRICS_REGISTRY.register_decorator("simulator")
class CatalogSimulatorAdapter(MetricsInterface):
"""Adapter for catalog simulator that implements MetricsInterface."""
[docs]
def __init__(self):
"""Initialize the CatalogSimulatorAdapter."""
self.is_connected = False
self.config = None
self.parent_job: Optional[JobInfo] = None
self.simulation_job: Optional[JobInfo] = None
self.available_metrics = [
"sales_volume",
"revenue",
]
[docs]
def connect(self, config: Dict[str, Any]) -> bool:
"""Establish connection to the catalog simulator.
Config is pre-validated with defaults merged via process_config().
"""
self.parent_job = config.get("parent_job")
# storage_path is an internal parameter (not in user config), fallback is appropriate
storage_path = config.get("storage_path", "output")
self.config = {
"mode": config["mode"],
"seed": config["seed"],
"storage_path": storage_path,
}
# Store enrichment config if provided
enrichment = config.get("ENRICHMENT")
if enrichment:
self.config["ENRICHMENT"] = enrichment
self.is_connected = True
return True
[docs]
def retrieve_business_metrics(self, products: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame:
"""Retrieve business metrics using catalog simulator's job-aware API."""
if not self.is_connected:
raise ConnectionError("Not connected to simulator. Call connect() first.")
if products is None or len(products) == 0:
raise ValueError("Products DataFrame cannot be empty")
try:
from online_retail_simulator.simulate import simulate_metrics
self._create_simulation_job()
transformed_input = self.transform_outbound(products, start_date, end_date)
# simulate_metrics expects products in job directory
self.simulation_job.save_df("products", transformed_input["product_characteristics"])
simulator_config = {"RULE": transformed_input["rule_config"]}
# Write config to job storage (cloud-compatible, preserved for debugging)
store = self.simulation_job.get_store()
store.write_yaml("simulator_config.yaml", simulator_config)
config_path = store.full_path("simulator_config.yaml")
simulate_metrics(self.simulation_job, config_path)
sales_df = self.simulation_job.load_df("metrics")
if self.config.get("ENRICHMENT"):
sales_df = self._apply_enrichment(sales_df)
return self.transform_inbound(sales_df)
except ImportError as e:
raise ConnectionError(f"online_retail_simulator package not available: {e}")
except Exception as e:
raise RuntimeError(f"Failed to retrieve metrics: {e}")
def _create_simulation_job(self) -> None:
"""Create a job for simulation artifacts. Uses nested job if parent provided."""
if self.parent_job is not None:
# Create nested job inside parent job directory
self.simulation_job = create_job(self.parent_job.full_path, prefix="job-catalog-simulator-simulation")
else:
# Create standalone job using configured storage path
self.simulation_job = create_job(self.config["storage_path"], prefix="job-catalog-simulator-simulation")
def _apply_enrichment(self, metrics_df: pd.DataFrame) -> pd.DataFrame:
"""Apply enrichment and return enriched metrics with quality_score.
Runs the enrichment pipeline (which boosts revenue for treated products
and adds an ``enriched`` boolean column), then overlays ``quality_score``
from the original/enriched product details.
"""
from online_retail_simulator.enrich import enrich
from online_retail_simulator.simulate import simulate_product_details
# Generate product_details (required by enrich)
product_details_config = {
"PRODUCT_DETAILS": {
"FUNCTION": "simulate_product_details_mock",
}
}
# Write config to job storage (cloud-compatible, preserved for debugging)
store = self.simulation_job.get_store()
store.write_yaml("product_details_config.yaml", product_details_config)
pd_config_path = store.full_path("product_details_config.yaml")
self.simulation_job = simulate_product_details(self.simulation_job, pd_config_path)
# Use config bridge to build IMPACT config
impact_config = ConfigBridge.build_enrichment_config(self.config["ENRICHMENT"])
# Write config to job storage (cloud-compatible, preserved for debugging)
store.write_yaml("enrichment_config.yaml", impact_config)
config_path = store.full_path("enrichment_config.yaml")
# Apply enrichment (creates product_details_original, product_details_enriched,
# and enriched metrics with boosted revenue + treatment indicator)
self.simulation_job = enrich(config_path, self.simulation_job)
# Load the enriched metrics (has boosted revenue + `enriched` column)
result = self.simulation_job.load_df("enriched")
# Load original and enriched product details
products_original = self.simulation_job.load_df("product_details_original")
products_enriched = self.simulation_job.load_df("product_details_enriched")
# Get enrichment_start from config (required parameter)
enrichment_params = self.config["ENRICHMENT"].get("PARAMS", {})
enrichment_start = enrichment_params.get("enrichment_start")
if not enrichment_start:
raise ValueError(
"enrichment_start is required in ENRICHMENT.PARAMS when using enrichment. "
"Specify the date (YYYY-MM-DD) when enrichment begins."
)
enrichment_date = pd.to_datetime(enrichment_start)
# Create quality lookup by product
orig_quality = products_original.set_index("product_identifier")["quality_score"].to_dict()
enr_quality = products_enriched.set_index("product_identifier")["quality_score"].to_dict()
# Add quality_score to enriched metrics based on date
result["date"] = pd.to_datetime(result["date"])
# Detect product ID column
id_col = "product_identifier" if "product_identifier" in result.columns else "product_id"
result["quality_score"] = result.apply(
lambda row: (
orig_quality.get(row[id_col], 0.5)
if row["date"] < enrichment_date
else enr_quality.get(row[id_col], 0.5)
),
axis=1,
)
return result
[docs]
def validate_connection(self) -> bool:
"""Validate that the catalog simulator connection is active and functional."""
if not self.is_connected:
return False
try:
from online_retail_simulator.core import RuleBackend # noqa: F401
return True
except ImportError:
return False