Source code for online_retail_simulator.enrich.enrichment_library

"""Library of predefined treatment effect functions for catalog enrichment."""

import copy
from datetime import datetime

import numpy as np
import pandas as pd


[docs] def quantity_boost(metrics: list, **kwargs) -> tuple: """ Boost ordered units by a percentage for enriched products. Args: metrics: List of metric record dictionaries **kwargs: Parameters including: - effect_size: Percentage increase in ordered units (default: 0.5 for 50% boost) - enrichment_fraction: Fraction of products to enrich (default: 0.3) - enrichment_start: Start date of enrichment (default: "2024-11-15") - seed: Random seed for product selection (default: 42) - min_units: Minimum units for enriched products with zero sales (default: 1) Returns: Tuple of (treated_metrics, potential_outcomes_df): - treated_metrics: List of modified metric dictionaries with treatment applied - potential_outcomes_df: DataFrame with Y0_revenue and Y1_revenue for all products """ effect_size = kwargs.get("effect_size", 0.5) enrichment_fraction = kwargs.get("enrichment_fraction", 0.3) enrichment_start = kwargs.get("enrichment_start", "2024-11-15") seed = kwargs.get("seed", 42) min_units = kwargs.get("min_units", 1) rng = np.random.default_rng(seed) unique_products = list(set(record["product_id"] for record in metrics)) n_enriched = int(len(unique_products) * enrichment_fraction) enriched_product_ids = set(rng.choice(unique_products, size=n_enriched, replace=False)) treated_metrics = [] potential_outcomes = {} # {(product_id, date): {'Y0_revenue': x, 'Y1_revenue': y}} start_date = datetime.strptime(enrichment_start, "%Y-%m-%d") for record in metrics: record_copy = copy.deepcopy(record) product_id = record_copy["product_id"] record_date_str = record_copy["date"] record_date = datetime.strptime(record_date_str, "%Y-%m-%d") is_enriched = product_id in enriched_product_ids record_copy["enriched"] = is_enriched # Calculate Y(0) - baseline revenue (no treatment) y0_revenue = record_copy["revenue"] # Calculate Y(1) - revenue if treated (for ALL products) unit_price = record_copy.get("unit_price", record_copy.get("price")) if record_date >= start_date: original_quantity = record_copy["ordered_units"] boosted_quantity = int(original_quantity * (1 + effect_size)) boosted_quantity = max(min_units, boosted_quantity) y1_revenue = round(boosted_quantity * unit_price, 2) else: # Before treatment start, Y(1) = Y(0) y1_revenue = y0_revenue # Store potential outcomes for ALL products key = (product_id, record_date_str) potential_outcomes[key] = {"Y0_revenue": y0_revenue, "Y1_revenue": y1_revenue} # Apply factual outcome (only for treated products) if is_enriched and record_date >= start_date: record_copy["ordered_units"] = boosted_quantity record_copy["revenue"] = y1_revenue treated_metrics.append(record_copy) # Build potential outcomes DataFrame potential_outcomes_df = pd.DataFrame( [ { "product_identifier": pid, "date": d, "Y0_revenue": v["Y0_revenue"], "Y1_revenue": v["Y1_revenue"], } for (pid, d), v in potential_outcomes.items() ] ) return treated_metrics, potential_outcomes_df
[docs] def probability_boost(metrics: list, **kwargs) -> tuple: """ Boost sale probability (simulated by ordered units increase as proxy). Args: metrics: List of metric record dictionaries **kwargs: Same parameters as quantity_boost Returns: Tuple of (treated_metrics, potential_outcomes_df) - same as quantity_boost """ return quantity_boost(metrics, **kwargs)
[docs] def product_detail_boost(metrics: list, **kwargs) -> tuple: """ Product detail regeneration and metrics boost for enrichment experiments. Selects a fraction of products for treatment, regenerates their product details (title, description, features) while preserving brand/category/price, and applies metrics boost effect. Args: metrics: List of metric record dictionaries **kwargs: Parameters including: - job_info: JobInfo for saving product artifacts (required for saving) - products: List of product dictionaries (required for product details) - effect_size: Percentage increase in ordered units (default: 0.5) - ramp_days: Number of days for ramp-up period (default: 7) - enrichment_fraction: Fraction of products to enrich (default: 0.3) - enrichment_start: Start date of enrichment (default: "2024-11-15") - seed: Random seed for product selection (default: 42) - prompt_path: Path to custom prompt template file (optional) - backend: Backend to use for regeneration ("mock" or "ollama", default: "mock") Returns: Tuple of (treated_metrics, potential_outcomes_df): - treated_metrics: List of modified metric dictionaries with treatment applied - potential_outcomes_df: DataFrame with Y0_revenue and Y1_revenue for all products """ job_info = kwargs.get("job_info") products = kwargs.get("products") effect_size = kwargs.get("effect_size", 0.5) ramp_days = kwargs.get("ramp_days", 7) enrichment_fraction = kwargs.get("enrichment_fraction", 0.3) enrichment_start = kwargs.get("enrichment_start", "2024-11-15") seed = kwargs.get("seed", 42) prompt_path = kwargs.get("prompt_path") backend = kwargs.get("backend", "mock") quality_boost = kwargs.get("quality_boost", 0.0) rng = np.random.default_rng(seed) # 1. Save original product details if job_info and products: job_info.save_df("product_details_original", pd.DataFrame(products)) # 2. Select treatment products if products: unique_product_ids = list(set(p.get("product_identifier", p.get("product_id")) for p in products)) else: unique_product_ids = list(set(record["product_id"] for record in metrics)) n_treatment = int(len(unique_product_ids) * enrichment_fraction) treatment_ids = set(rng.choice(unique_product_ids, size=n_treatment, replace=False)) # 3. Regenerate product details for treatment products if products and job_info: updated_products = _regenerate_product_details( products, treatment_ids, prompt_path, backend, seed, quality_boost ) job_info.save_df("product_details_enriched", pd.DataFrame(updated_products)) # 4. Apply metrics boost effect and calculate potential outcomes treated_metrics = [] potential_outcomes = {} # {(product_id, date): {'Y0_revenue': x, 'Y1_revenue': y}} start_date = datetime.strptime(enrichment_start, "%Y-%m-%d") for record in metrics: record_copy = copy.deepcopy(record) product_id = record_copy.get("product_id", record_copy.get("product_identifier")) record_date_str = record_copy["date"] record_date = datetime.strptime(record_date_str, "%Y-%m-%d") is_enriched = product_id in treatment_ids record_copy["enriched"] = is_enriched # Calculate Y(0) - baseline revenue (no treatment) y0_revenue = record_copy["revenue"] # Calculate Y(1) - revenue if treated (for ALL products, with ramp-up) unit_price = record_copy.get("unit_price", record_copy.get("price")) if record_date >= start_date: days_since_start = (record_date - start_date).days ramp_factor = 1.0 if ramp_days <= 0 else min(1.0, days_since_start / ramp_days) adjusted_effect = effect_size * ramp_factor original_quantity = record_copy["ordered_units"] boosted_quantity = int(original_quantity * (1 + adjusted_effect)) y1_revenue = round(boosted_quantity * unit_price, 2) else: # Before treatment start, Y(1) = Y(0) y1_revenue = y0_revenue # Store potential outcomes for ALL products key = (product_id, record_date_str) potential_outcomes[key] = {"Y0_revenue": y0_revenue, "Y1_revenue": y1_revenue} # Apply factual outcome (only for treated products) if is_enriched and record_date >= start_date: record_copy["ordered_units"] = boosted_quantity record_copy["revenue"] = y1_revenue treated_metrics.append(record_copy) # Build potential outcomes DataFrame potential_outcomes_df = pd.DataFrame( [ { "product_identifier": pid, "date": d, "Y0_revenue": v["Y0_revenue"], "Y1_revenue": v["Y1_revenue"], } for (pid, d), v in potential_outcomes.items() ] ) return treated_metrics, potential_outcomes_df
def _regenerate_product_details( products: list, treatment_ids: set, prompt_path: str, backend: str, seed: int, quality_boost: float, ) -> list: """ Regenerate product details for treatment products. Preserves: brand, category, price Regenerates: title, description, features Updates: quality_score (recalculated + optional boost) Args: products: List of product dictionaries treatment_ids: Set of product IDs to treat prompt_path: Path to custom prompt template backend: Backend to use ("mock" or "ollama") seed: Random seed quality_boost: Additional quality score boost for treated products (0.0-1.0) """ control_products = [] treatment_products = [] for product in products: product_copy = copy.deepcopy(product) product_id = product_copy.get("product_identifier", product_copy.get("product_id")) if product_id in treatment_ids: product_copy["enriched"] = True treatment_products.append(product_copy) else: product_copy["enriched"] = False control_products.append(product_copy) if treatment_products: treatment_df = pd.DataFrame(treatment_products) if backend == "ollama": from ..simulate.product_details_ollama import simulate_product_details_ollama regenerated_df = simulate_product_details_ollama(treatment_df, prompt_path=prompt_path) else: # mock from ..simulate.product_details_mock import simulate_product_details_mock regenerated_df = simulate_product_details_mock( treatment_df, seed=seed, prompt_path=prompt_path, treatment_mode=True ) regenerated_df["enriched"] = True # Apply quality boost for treated products (0.0 = no boost) regenerated_df["quality_score"] = regenerated_df["quality_score"].apply(lambda x: min(x + quality_boost, 1.0)) treatment_products = regenerated_df.to_dict("records") return control_products + treatment_products