{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Covariate Stratification\n", "\n", "This notebook demonstrates **subclassification (stratification)** impact estimation via [pandas](https://pandas.pydata.org/) `qcut()` and [NumPy](https://numpy.org/) `np.average()`.\n", "\n", "Subclassification stratifies observations into strata based on covariate quantiles, computes within-stratum treatment effects, and aggregates via weighted average.\n", "\n", "## Workflow overview\n", "\n", "1. User provides `products.csv`\n", "2. User configures `DATA.ENRICHMENT` for treatment assignment\n", "3. User calls `measure_impact(config.yaml)`\n", "4. Engine handles everything internally (adapter, enrichment, model)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initial setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "\n", "import pandas as pd\n", "from impact_engine_measure import measure_impact, load_results\n", "from impact_engine_measure.core.validation import load_config\n", "from impact_engine_measure.models.factory import get_model_adapter\n", "from online_retail_simulator import enrich, simulate" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 1 — Product Catalog\n", "\n", "In production, this would be your actual product catalog." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output_path = Path(\"output/demo_subclassification\")\n", "output_path.mkdir(parents=True, exist_ok=True)\n", "\n", "catalog_job = simulate(\"configs/demo_subclassification_catalog.yaml\", job_id=\"catalog\")\n", "products = catalog_job.load_df(\"products\")\n", "\n", "print(f\"Generated {len(products)} products\")\n", "print(f\"Products catalog: {catalog_job.get_store().full_path('products.csv')}\")\n", "products.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 2 — Engine configuration\n", "\n", "Configure the engine with the following sections.\n", "- `ENRICHMENT` — Treatment assignment via quality boost (50/50 split)\n", "- `MODEL` — `subclassification` with price as covariate\n", "\n", "Single-day simulation (`start_date = end_date`) produces cross-sectional data required by subclassification." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "config_path = \"configs/demo_subclassification.yaml\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 3 — Impact evaluation\n", "\n", "A single call to `measure_impact()` handles everything.\n", "- Engine creates `CatalogSimulatorAdapter`\n", "- Adapter simulates metrics (single-day, cross-sectional)\n", "- Adapter applies enrichment (treatment assignment + revenue boost)\n", "- `SubclassificationAdapter` stratifies on price, computes per-stratum effects" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_info = measure_impact(config_path, str(output_path), job_id=\"results\")\n", "print(f\"Job ID: {job_info.job_id}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 4 — Review results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result = load_results(job_info)\n", "\n", "data = result.impact_results[\"data\"]\n", "estimates = data[\"impact_estimates\"]\n", "summary = data[\"model_summary\"]\n", "\n", "print(\"=\" * 60)\n", "print(\"SUBCLASSIFICATION IMPACT ESTIMATION RESULTS\")\n", "print(\"=\" * 60)\n", "\n", "print(f\"\\nModel Type: {result.model_type}\")\n", "print(f\"Estimand: {summary['estimand']}\")\n", "\n", "print(\"\\n--- Impact Estimates ---\")\n", "print(f\"Treatment Effect: {estimates['treatment_effect']:.4f}\")\n", "print(f\"Strata Used: {estimates['n_strata']}\")\n", "print(f\"Strata Dropped: {estimates['n_strata_dropped']}\")\n", "\n", "print(\"\\n--- Model Summary ---\")\n", "print(f\"Observations: {summary['n_observations']}\")\n", "print(f\"Treated: {summary['n_treated']}\")\n", "print(f\"Control: {summary['n_control']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Per-stratum details from model artifacts\n", "stratum_df = result.model_artifacts[\"stratum_details\"]\n", "\n", "print(\"--- Per-Stratum Breakdown ---\")\n", "print(\"-\" * 70)\n", "print(f\"{'Stratum':<10} {'Treated':<10} {'Control':<10} {'Mean T':<12} {'Mean C':<12} {'Effect':<12}\")\n", "print(\"-\" * 70)\n", "for _, row in stratum_df.iterrows():\n", " print(\n", " f\"{row['stratum']:<10} {row['n_treated']:<10} {row['n_control']:<10} \"\n", " f\"{row['mean_treated']:<12.2f} {row['mean_control']:<12.2f} {row['effect']:<12.2f}\"\n", " )\n", "\n", "print(\"\\n\" + \"=\" * 60)\n", "print(\"Demo Complete!\")\n", "print(\"=\" * 60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Step 5 — Model validation\n", "\n", "Compare the model's estimate against the **true causal effect** computed from counterfactual vs factual data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def calculate_true_effect(\n", " baseline_metrics: pd.DataFrame,\n", " enriched_metrics: pd.DataFrame,\n", ") -> dict:\n", " \"\"\"Calculate TRUE ATT by comparing per-product revenue for treated products.\"\"\"\n", " treated_ids = enriched_metrics[enriched_metrics[\"enriched\"]][\"product_id\"].unique()\n", "\n", " enriched_treated = enriched_metrics[enriched_metrics[\"product_id\"].isin(treated_ids)]\n", " baseline_treated = baseline_metrics[baseline_metrics[\"product_id\"].isin(treated_ids)]\n", "\n", " enriched_mean = enriched_treated.groupby(\"product_id\")[\"revenue\"].mean().mean()\n", " baseline_mean = baseline_treated.groupby(\"product_id\")[\"revenue\"].mean().mean()\n", " treatment_effect = enriched_mean - baseline_mean\n", "\n", " return {\n", " \"enriched_mean\": float(enriched_mean),\n", " \"baseline_mean\": float(baseline_mean),\n", " \"treatment_effect\": float(treatment_effect),\n", " }" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "baseline_metrics = catalog_job.load_df(\"metrics\").rename(columns={\"product_identifier\": \"product_id\"})\n", "\n", "enrich(\"configs/demo_subclassification_enrichment.yaml\", catalog_job)\n", "enriched_metrics = catalog_job.load_df(\"enriched\").rename(columns={\"product_identifier\": \"product_id\"})\n", "\n", "print(f\"Baseline records: {len(baseline_metrics)}\")\n", "print(f\"Enriched records: {len(enriched_metrics)}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "true_effect = calculate_true_effect(baseline_metrics, enriched_metrics)\n", "\n", "true_te = true_effect[\"treatment_effect\"]\n", "model_te = estimates[\"treatment_effect\"]\n", "\n", "if true_te != 0:\n", " recovery_accuracy = (1 - abs(1 - model_te / true_te)) * 100\n", "else:\n", " recovery_accuracy = 100 if model_te == 0 else 0\n", "\n", "print(\"=\" * 60)\n", "print(\"TRUTH RECOVERY VALIDATION\")\n", "print(\"=\" * 60)\n", "print(f\"True treatment effect: {true_te:.4f}\")\n", "print(f\"Model estimate: {model_te:.4f}\")\n", "print(f\"Recovery accuracy: {max(0, recovery_accuracy):.1f}%\")\n", "print(\"=\" * 60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Convergence analysis\n", "\n", "How does the estimate converge to the true effect as sample size increases?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sample_sizes = [20, 50, 100, 200, 300, 500, 1500]\n", "estimates_list = []\n", "truth_list = []\n", "\n", "parsed = load_config(config_path)\n", "measurement_config = parsed[\"MEASUREMENT\"]\n", "all_product_ids = enriched_metrics[\"product_id\"].unique()\n", "\n", "for n in sample_sizes:\n", " subset_ids = all_product_ids[:n]\n", " enriched_sub = enriched_metrics[enriched_metrics[\"product_id\"].isin(subset_ids)]\n", " baseline_sub = baseline_metrics[baseline_metrics[\"product_id\"].isin(subset_ids)]\n", "\n", " true = calculate_true_effect(baseline_sub, enriched_sub)\n", " truth_list.append(true[\"treatment_effect\"])\n", "\n", " model = get_model_adapter(\"subclassification\")\n", " model.connect(measurement_config[\"PARAMS\"])\n", " result = model.fit(data=enriched_sub)\n", " estimates_list.append(result.data[\"impact_estimates\"][\"treatment_effect\"])\n", "\n", "print(\"Convergence analysis complete.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from notebook_support import plot_convergence\n", "\n", "plot_convergence(\n", " sample_sizes,\n", " estimates_list,\n", " truth_list,\n", " xlabel=\"Number of Products\",\n", " ylabel=\"Treatment Effect\",\n", " title=\"Subclassification: Convergence of Estimate to True Effect\",\n", ")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 4 }