"""Load and access job results produced by evaluate_impact()."""
from dataclasses import dataclass
from typing import Any, Dict
import pandas as pd
from artifact_store import JobInfo
# Fixed pipeline files that every run produces.
_PIPELINE_KEYS = frozenset(
{
"config",
"products",
"business_metrics",
"transformed_metrics",
"impact_results",
}
)
_FORMAT_READERS = {
"json": lambda store, path: store.read_json(path),
"yaml": lambda store, path: store.read_yaml(path),
"parquet": lambda store, path: store.read_parquet(path),
}
[docs]
@dataclass
class MeasureJobResult:
"""Typed container for all artifacts produced by a single pipeline run.
Attributes
----------
job_id: Unique identifier for the job.
model_type: Model identifier (e.g. ``"interrupted_time_series"``).
created_at: ISO-8601 timestamp of job creation.
config: The YAML configuration used for this run.
impact_results: The ``impact_results.json`` envelope (model_type, data, metadata).
products: Product catalog DataFrame.
business_metrics: Raw business metrics DataFrame.
transformed_metrics: Transformed metrics DataFrame.
model_artifacts: Model-specific supplementary DataFrames, keyed by
artifact name with the ``{model_type}__`` prefix stripped.
"""
job_id: str
model_type: str
created_at: str
config: Dict[str, Any]
impact_results: Dict[str, Any]
products: pd.DataFrame
business_metrics: pd.DataFrame
transformed_metrics: pd.DataFrame
model_artifacts: Dict[str, pd.DataFrame]
[docs]
def load_results(job_info: JobInfo) -> MeasureJobResult:
"""Load all artifacts from a completed pipeline run.
Reads ``manifest.json`` to discover files, then loads each one using the
format-appropriate reader. Model-specific artifacts (those not in the
fixed pipeline set) are collected into ``model_artifacts`` with the
``{model_type}__`` prefix stripped from their keys.
Parameters
----------
job_info : JobInfo
``JobInfo`` returned by :func:`evaluate_impact`.
Returns
-------
MeasureJobResult
Typed container with every artifact.
Raises
------
FileNotFoundError
If the job directory or manifest is missing.
ValueError
If the manifest's major schema version is incompatible.
"""
store = job_info.get_store()
if not store.exists("manifest.json"):
raise FileNotFoundError(f"manifest.json not found in job directory: {store.full_path('manifest.json')}")
manifest = store.read_json("manifest.json")
files = manifest["files"]
model_type = manifest["model_type"]
# Load fixed pipeline artifacts.
config = _load_file(store, files["config"])
impact_results = _load_file(store, files["impact_results"])
products = _load_file(store, files["products"])
business_metrics = _load_file(store, files["business_metrics"])
transformed_metrics = _load_file(store, files["transformed_metrics"])
# Collect model-specific artifacts (everything not in the fixed set).
model_artifacts: Dict[str, pd.DataFrame] = {}
prefix = f"{model_type}__"
for key, file_info in files.items():
if key not in _PIPELINE_KEYS:
name = key[len(prefix) :] if key.startswith(prefix) else key
model_artifacts[name] = _load_file(store, file_info)
return MeasureJobResult(
job_id=job_info.job_id,
model_type=model_type,
created_at=manifest["created_at"],
config=config,
impact_results=impact_results,
products=products,
business_metrics=business_metrics,
transformed_metrics=transformed_metrics,
model_artifacts=model_artifacts,
)
def _load_file(store, file_info: Dict[str, str]) -> Any:
"""Load a single file using the format declared in the manifest."""
fmt = file_info["format"]
path = file_info["path"]
reader = _FORMAT_READERS.get(fmt)
if reader is None:
raise ValueError(f"Unsupported format '{fmt}' for file '{path}'")
return reader(store, path)