Source code for impact_engine_measure.engine

"""Impact analysis engine for the impact_engine_measure package."""

import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional

from artifact_store import ArtifactStore, JobInfo

from .config import parse_config_file
from .core import apply_transform
from .metrics import create_metrics_manager
from .models import create_models_manager
from .normalize import MEASURE_RESULT_FILENAME, normalize_result
from .storage import create_storage_manager


[docs] def measure_impact( config_path: str, storage_url: str = "./data", job_id: Optional[str] = None, ) -> JobInfo: """Measure causal impact using the configured model and metrics. Evaluates impact using business metrics retrieved through the metrics layer and models layer for statistical analysis. Parameters ---------- config_path : str Path to configuration file containing metrics and model settings. The config must include DATA.SOURCE.CONFIG.PATH pointing to a products CSV file. storage_url : str Storage URL or path (e.g., "./data", "s3://bucket/prefix"). job_id : str, optional Job ID for resuming existing jobs or using custom IDs. If not provided, a unique ID will be auto-generated. Returns ------- JobInfo Job object for the completed run. Use ``load_results(job_info)`` to load all artifacts into a typed ``MeasureJobResult``. """ config = parse_config_file(config_path) source_config = config["DATA"]["SOURCE"]["CONFIG"] transform_config = config["DATA"]["TRANSFORM"] data_path = source_config["path"] # Create storage manager (storage_url allows tests to pass temp directories) storage_manager = create_storage_manager(storage_url, job_id=job_id) # Save artifacts for observability storage_manager.write_yaml("config.yaml", config) data_store, data_filename = ArtifactStore.from_file_path(data_path) products = data_store.read_data(data_filename) storage_manager.write_parquet("products.parquet", products) metrics_manager = create_metrics_manager(config, parent_job=storage_manager.get_job()) models_manager = create_models_manager(config_path) business_metrics = metrics_manager.retrieve_metrics(products) storage_manager.write_parquet("business_metrics.parquet", business_metrics) transformed_metrics = apply_transform(business_metrics, transform_config) storage_manager.write_parquet("transformed_metrics.parquet", transformed_metrics) fit_output = models_manager.fit_model(data=transformed_metrics, storage=storage_manager) # Write normalized estimates so downstream consumers read a flat dict # instead of parsing model-specific schemas. impact_results = json.loads(Path(fit_output.results_path).read_text(encoding="utf-8")) measure_result = normalize_result(impact_results) storage_manager.write_json(MEASURE_RESULT_FILENAME, measure_result) # Write manifest as the final step (R3: self-describing output) pipeline_files = { "config": {"path": "config.yaml", "format": "yaml"}, "products": {"path": "products.parquet", "format": "parquet"}, "business_metrics": {"path": "business_metrics.parquet", "format": "parquet"}, "transformed_metrics": { "path": "transformed_metrics.parquet", "format": "parquet", }, "impact_results": {"path": "impact_results.json", "format": "json"}, "measure_result": {"path": MEASURE_RESULT_FILENAME, "format": "json"}, } # Add model-specific artifacts for name, full_path in fit_output.artifact_paths.items(): filename = f"{fit_output.model_type}__{name}.parquet" pipeline_files[f"{fit_output.model_type}__{name}"] = { "path": filename, "format": "parquet", } manifest = { "model_type": fit_output.model_type, "created_at": datetime.now(timezone.utc).isoformat(), "files": pipeline_files, } storage_manager.write_json("manifest.json", manifest) return storage_manager.get_job()