API Reference

This section provides auto-generated API documentation from the Impact Engine source code.

Engine

Impact analysis engine for the impact_engine_measure package.

impact_engine_measure.engine.measure_impact(config_path, storage_url='./data', job_id=None)[source]

Measure causal impact using the configured model and metrics.

Evaluates impact using business metrics retrieved through the metrics layer and models layer for statistical analysis.

Parameters:
  • config_path (str) – Path to configuration file containing metrics and model settings. The config must include DATA.SOURCE.CONFIG.PATH pointing to a products CSV file.

  • storage_url (str) – Storage URL or path (e.g., “./data”, “s3://bucket/prefix”).

  • job_id (str, optional) – Job ID for resuming existing jobs or using custom IDs. If not provided, a unique ID will be auto-generated.

Returns:

Job object for the completed run. Use load_results(job_info) to load all artifacts into a typed MeasureJobResult.

Return type:

JobInfo

Results

Load and access job results produced by evaluate_impact().

class impact_engine_measure.results.MeasureJobResult(job_id, model_type, created_at, config, impact_results, products, business_metrics, transformed_metrics, model_artifacts)[source]

Bases: object

Typed container for all artifacts produced by a single pipeline run.

Parameters:
  • job_id (str)

  • model_type (str)

  • created_at (str)

  • config (Dict[str, Any])

  • impact_results (Dict[str, Any])

  • products (DataFrame)

  • business_metrics (DataFrame)

  • transformed_metrics (DataFrame)

  • model_artifacts (Dict[str, DataFrame])

job_id
Type:

Unique identifier for the job.

model_type
Type:

Model identifier (e.g. "interrupted_time_series").

created_at
Type:

ISO-8601 timestamp of job creation.

config
Type:

The YAML configuration used for this run.

impact_results
Type:

The impact_results.json envelope (model_type, data, metadata).

products
Type:

Product catalog DataFrame.

business_metrics
Type:

Raw business metrics DataFrame.

transformed_metrics
Type:

Transformed metrics DataFrame.

model_artifacts

artifact name with the {model_type}__ prefix stripped.

Type:

Model-specific supplementary DataFrames, keyed by

__init__(job_id, model_type, created_at, config, impact_results, products, business_metrics, transformed_metrics, model_artifacts)
Parameters:
  • job_id (str)

  • model_type (str)

  • created_at (str)

  • config (Dict[str, Any])

  • impact_results (Dict[str, Any])

  • products (DataFrame)

  • business_metrics (DataFrame)

  • transformed_metrics (DataFrame)

  • model_artifacts (Dict[str, DataFrame])

Return type:

None

impact_engine_measure.results.load_results(job_info)[source]

Load all artifacts from a completed pipeline run.

Reads manifest.json to discover files, then loads each one using the format-appropriate reader. Model-specific artifacts (those not in the fixed pipeline set) are collected into model_artifacts with the {model_type}__ prefix stripped from their keys.

Parameters:

job_info (JobInfo) – JobInfo returned by evaluate_impact().

Returns:

Typed container with every artifact.

Return type:

MeasureJobResult

Raises:
  • FileNotFoundError – If the job directory or manifest is missing.

  • ValueError – If the manifest’s major schema version is incompatible.

Metrics Layer

Manager

Metrics Manager for coordinating metrics operations.

Design: Uses dependency injection to receive metrics adapter from factory. This decouples coordination logic from adapter selection, enabling: - Easy unit testing with mock adapters - Adapter selection controlled by configuration, not hardcoded

class impact_engine_measure.metrics.manager.MetricsManager(source_config, metrics_source, source_type, parent_job=None)[source]

Bases: object

Central coordinator for metrics management.

Uses dependency injection - the metrics source is passed in via constructor, making the manager easy to test with mock implementations.

Note: source_config is expected to be pre-validated via process_config().

Parameters:
__init__(source_config, metrics_source, source_type, parent_job=None)[source]

Initialize the MetricsManager with injected metrics source.

Parameters:
  • source_config (dict) – SOURCE.CONFIG configuration block (pre-validated, with defaults merged).

  • metrics_source (MetricsInterface) – The metrics implementation to use for data retrieval.

  • source_type (str) – The type of metrics source (e.g., “simulator”, “file”).

  • parent_job (JobInfo, optional) – Optional parent job for artifact management.

retrieve_metrics(products)[source]

Retrieve business metrics for specified products using SOURCE.CONFIG date range.

Parameters:

products (DataFrame)

Return type:

DataFrame

get_current_config()[source]

Get the currently loaded configuration.

Return type:

Dict[str, Any] | None

Base Interface

Base interfaces and common classes for the metrics layer.

class impact_engine_measure.metrics.base.MetricsInterface[source]

Bases: ABC

Abstract base class defining the contract for all metrics implementations.

Required methods (must override):
  • connect: Initialize adapter with configuration

  • retrieve_business_metrics: Fetch metrics data

Optional methods (have sensible defaults):
  • validate_connection: Check if connection is active

  • transform_outbound: Transform data to external format

  • transform_inbound: Transform data from external format

abstractmethod connect(config)[source]

Establish connection to the metrics source.

Parameters:

config (Dict[str, Any])

Return type:

bool

abstractmethod retrieve_business_metrics(products, start_date, end_date)[source]

Retrieve business metrics for specified products and time range.

Parameters:
  • products (pd.DataFrame) – DataFrame with product identifiers and characteristics.

  • start_date (str) – Start date in YYYY-MM-DD format.

  • end_date (str) – End date in YYYY-MM-DD format.

Returns:

DataFrame with business metrics for the specified products.

Return type:

pd.DataFrame

validate_connection()[source]

Validate that the metrics source connection is active and functional.

Default implementation returns True. Override for custom validation.

Returns:

True if connection is valid, False otherwise.

Return type:

bool

transform_outbound(products, start_date, end_date)[source]

Transform impact engine format to external system format.

Default implementation is pass-through. Override for adapters that need data transformation.

Parameters:
  • products (pd.DataFrame) – DataFrame with product identifiers and characteristics.

  • start_date (str) – Start date in YYYY-MM-DD format.

  • end_date (str) – End date in YYYY-MM-DD format.

Returns:

Dictionary with parameters formatted for the external system.

Return type:

dict

transform_inbound(external_data)[source]

Transform external system response to impact engine format.

Default implementation returns data as-is if DataFrame, otherwise raises. Override for adapters that need result transformation.

Parameters:

external_data (Any) – Raw data from the external system.

Returns:

DataFrame with standardized business metrics format.

Return type:

pd.DataFrame

Catalog Simulator Adapter

Catalog Simulator Adapter - adapts online_retail_simulator package to MetricsInterface.

Integration is governed by contracts (schemas) and config bridge (config translation).

class impact_engine_measure.metrics.catalog_simulator.adapter.CatalogSimulatorAdapter[source]

Bases: MetricsInterface

Adapter for catalog simulator that implements MetricsInterface.

__init__()[source]

Initialize the CatalogSimulatorAdapter.

connect(config)[source]

Establish connection to the catalog simulator.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (Dict[str, Any])

Return type:

bool

retrieve_business_metrics(products, start_date, end_date)[source]

Retrieve business metrics using catalog simulator’s job-aware API.

Parameters:
  • products (DataFrame)

  • start_date (str)

  • end_date (str)

Return type:

DataFrame

validate_connection()[source]

Validate that the catalog simulator connection is active and functional.

Return type:

bool

transform_outbound(products, start_date, end_date)[source]

Transform impact engine format to catalog simulator format using contracts.

Parameters:
  • products (DataFrame)

  • start_date (str)

  • end_date (str)

Return type:

Dict[str, Any]

transform_inbound(external_data)[source]

Transform catalog simulator response to impact engine format using contracts.

Parameters:

external_data (Any)

Return type:

DataFrame

File Adapter

File Adapter - reads metrics data from CSV or Parquet files.

This adapter enables file-based workflows where upstream processes produce data files that impact-engine consumes.

class impact_engine_measure.metrics.file.adapter.FileAdapter[source]

Bases: MetricsInterface

Adapter for file-based data sources that implements MetricsInterface.

Supports CSV and Parquet file formats, including partitioned Parquet directories. The file is expected to contain pre-processed data ready for impact analysis.

Configuration:

DATA:
    SOURCE:
        type: file
        CONFIG:
            path: path/to/data.csv              # Single CSV file
            path: path/to/data.parquet          # Single Parquet file
            path: path/to/partitioned_data/     # Partitioned Parquet directory
            # Optional parameters:
            date_column: date        # Column name for date filtering
            product_id_column: product_id  # Column name for product IDs
__init__()[source]

Initialize the FileAdapter.

connect(config)[source]

Initialize adapter with configuration parameters.

Parameters:

config (dict) – Dictionary containing (lowercase keys, merged via process_config): - path: Path to the data file (required) - date_column: Column name for dates (optional) - product_id_column: Column name for product IDs (optional, default: product_id)

Returns:

True if initialization successful.

Return type:

bool

Raises:
retrieve_business_metrics(products, start_date, end_date)[source]

Retrieve business metrics from the loaded file.

For file-based sources, the data is already loaded. This method optionally filters by date range and product IDs if configured.

Parameters:
  • products (pd.DataFrame) – DataFrame with product identifiers (can be empty for file sources).

  • start_date (str) – Start date in YYYY-MM-DD format (used if DATE_COLUMN configured).

  • end_date (str) – End date in YYYY-MM-DD format (used if DATE_COLUMN configured).

Returns:

DataFrame with business metrics.

Return type:

pd.DataFrame

Raises:

ConnectionError – If adapter not connected.

validate_connection()[source]

Validate that the file source is accessible.

Returns:

True if file exists and data is loaded.

Return type:

bool

transform_outbound(products, start_date, end_date)[source]

Transform impact engine format to file adapter format.

For file-based sources, this is a pass-through since the file already contains the data in the expected format.

Parameters:
  • products (pd.DataFrame) – DataFrame with product identifiers.

  • start_date (str) – Start date.

  • end_date (str) – End date.

Returns:

Dictionary with query parameters.

Return type:

dict

transform_inbound(external_data)[source]

Transform file data to impact engine format.

For file-based sources, this adds metadata fields and ensures proper column naming.

Parameters:

external_data (Any) – DataFrame read from file.

Returns:

DataFrame with standardized format.

Return type:

pd.DataFrame

Models Layer

Manager

Models manager for coordinating model operations.

class impact_engine_measure.models.manager.FitOutput(results_path, artifact_paths=<factory>, model_type='')[source]

Bases: object

Structured output from fit_model().

Provides programmatic access to the results path and all artifact paths, so callers do not need to reconstruct file paths from model internals.

Parameters:
results_path
Type:

Full path/URL to impact_results.json.

artifact_paths
Type:

Mapping of artifact name to full path/URL.

model_type
Type:

The model type that produced this output.

__init__(results_path, artifact_paths=<factory>, model_type='')
Parameters:
Return type:

None

class impact_engine_measure.models.manager.ModelsManager(measurement_config, model)[source]

Bases: object

Central coordinator for model management.

Uses dependency injection - the model is passed in via constructor, making the manager easy to test with mock implementations.

Note: measurement_config is expected to be pre-validated via process_config().

Parameters:
__init__(measurement_config, model)[source]

Initialize the ModelsManager with injected model.

Parameters:
  • measurement_config (dict) – MEASUREMENT configuration block (pre-validated, with defaults merged).

  • model (ModelInterface) – The model implementation to use for fitting.

fit_model(data, storage=None, **overrides)[source]

Fit model using configuration parameters.

All PARAMS from config are forwarded as kwargs to validate_params() and fit(). Callers can override any config param via **overrides.

Parameters:
  • data (pd.DataFrame) – DataFrame containing data for model fitting.

  • storage (StorageManager) – Storage backend for artifacts.

  • **overrides – Override any MEASUREMENT.PARAMS value (e.g., intervention_date, dependent_variable).

Returns:

FitOutput with paths to all persisted files.

Return type:

FitOutput

get_current_config()[source]

Get the currently loaded configuration.

Return type:

Dict[str, Any] | None

Base Interface

Base interface for impact models.

class impact_engine_measure.models.base.ModelResult(model_type, data, metadata=<factory>, artifacts=<factory>)[source]

Bases: object

Standardized model result container.

All models return this structure, allowing the manager to handle storage uniformly while models remain storage-agnostic.

The data dict must use three standardized keys:
  • model_params: Input parameters used (formula, intervention_date, etc.)

  • impact_estimates: The treatment effect measurements

  • model_summary: Fit diagnostics, sample sizes, configuration echo

Parameters:
model_type
Type:

Identifier for the model that produced this result.

data
Type:

Primary result data with keys: model_params, impact_estimates, model_summary.

metadata
Type:

Metadata about the model run (populated by the manager).

artifacts

Keys are format-agnostic names; the manager prefixes with model_type and appends the file extension.

Type:

Supplementary DataFrames to persist (e.g., per-product details).

to_dict()[source]

Convert to dictionary for storage/serialization.

Returns an envelope with model_type, data, and metadata. The data key contains the model-specific payload (nested, not spread).

Return type:

Dict[str, Any]

__init__(model_type, data, metadata=<factory>, artifacts=<factory>)
Parameters:
Return type:

None

class impact_engine_measure.models.base.ModelInterface[source]

Bases: ABC

Abstract base class for impact models.

Defines the unified interface that all impact models must implement. This ensures consistent behavior across different modeling approaches (interrupted time series, causal inference, metrics approximation, etc.).

Required methods (must override):
  • connect: Initialize model with configuration

  • fit: Fit model to data

  • validate_params: Validate model-specific parameters before fitting

Optional methods (have sensible defaults):
  • validate_connection: Check if model is ready

  • validate_data: Check if input data is valid

  • get_required_columns: Return list of required columns

  • transform_outbound: Transform data to external format

  • transform_inbound: Transform results from external format

abstractmethod connect(config)[source]

Initialize model with configuration parameters.

Parameters:

config (dict) – Dictionary containing model configuration parameters.

Returns:

True if initialization successful, False otherwise.

Return type:

bool

abstractmethod fit(data, **kwargs)[source]

Fit the model to the provided data.

Parameters:
  • data (pd.DataFrame) – DataFrame containing data for model fitting.

  • **kwargs – Model-specific parameters (e.g., intervention_date, dependent_variable).

Returns:

Model-specific results (Dict, str path, etc.)

Return type:

Any

Raises:
  • ValueError – If data validation fails or required columns are missing.

  • RuntimeError – If model fitting fails.

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Default implementation returns True. Override for custom validation.

Returns:

True if model is ready, False otherwise.

Return type:

bool

validate_data(data)[source]

Validate that the input data meets model requirements.

Default implementation checks if data is non-empty. Override for custom validation.

Parameters:

data (pd.DataFrame) – DataFrame to validate.

Returns:

True if data is valid, False otherwise.

Return type:

bool

get_required_columns()[source]

Get the list of required columns for this model.

Default implementation returns empty list. Override if model requires specific columns.

Returns:

Column names that must be present in input data.

Return type:

list of str

abstractmethod validate_params(params)[source]

Validate model-specific parameters before fitting.

This method is called by ModelsManager before fit() to perform early validation of required parameters. All model implementations MUST override this method to validate their specific parameters.

Centralized config validation (process_config) handles known models, but this method ensures custom/user-defined models also validate.

Parameters:

params (dict) – Dictionary containing parameters that will be passed to fit(). Typical keys: intervention_date, dependent_variable.

Raises:

ValueError – If required parameters are missing or invalid.

Return type:

None

get_fit_params(params)[source]

Filter parameters to only those accepted by this adapter’s fit().

Called by ModelsManager before fit() to prevent cross-model param pollution. Default returns all params (backward compatible). Built-in adapters override.

Parameters:

params (dict) – Full params dict (config PARAMS merged with caller overrides).

Returns:

Filtered dict for fit().

Return type:

dict

transform_outbound(data, **kwargs)[source]

Transform impact engine format to model library format.

Default implementation is pass-through. Override for models that need data transformation.

Parameters:
  • data (pd.DataFrame) – DataFrame with impact engine standardized format.

  • **kwargs – Additional model-specific parameters.

Returns:

Dictionary with parameters formatted for the model library.

Return type:

dict

transform_inbound(model_results)[source]

Transform model library results to impact engine format.

Default implementation returns results as-is (or wrapped in dict). Override for models that need result transformation.

Parameters:

model_results (Any) – Raw results from the model library.

Returns:

Dictionary with standardized impact analysis results.

Return type:

dict

Interrupted Time Series Adapter

Interrupted Time Series Model Adapter - adapts SARIMAX to ModelInterface.

class impact_engine_measure.models.interrupted_time_series.adapter.TransformedInput(y, exog, data, dependent_variable, intervention_date, order, seasonal_order)[source]

Bases: object

Container for transformed model input data.

This dataclass eliminates hidden state by explicitly passing all necessary data between transformation and result formatting.

Parameters:
y: ndarray
exog: DataFrame
data: DataFrame
dependent_variable: str
intervention_date: str
order: Tuple[int, int, int]
seasonal_order: Tuple[int, int, int, int]
__init__(y, exog, data, dependent_variable, intervention_date, order, seasonal_order)
Parameters:
Return type:

None

class impact_engine_measure.models.interrupted_time_series.adapter.InterruptedTimeSeriesAdapter[source]

Bases: ModelInterface

Estimates causal impact of an intervention using time series analysis.

Constraints: - Data must be ordered chronologically with a ‘date’ column - intervention_date parameter required in MEASUREMENT.PARAMS - Requires sufficient pre and post-intervention observations (minimum 3 total)

__init__()[source]

Initialize the InterruptedTimeSeriesAdapter.

connect(config)[source]

Initialize model with configuration parameters.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (Dict[str, Any])

Return type:

bool

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Return type:

bool

validate_params(params)[source]

Validate ITS-specific parameters.

Parameters:

params (dict) – Parameters dict with intervention_date, dependent_variable, etc.

Raises:

ValueError – If intervention_date is missing.

Return type:

None

get_fit_params(params)[source]

ITS accepts intervention_date, dependent_variable, order, seasonal_order.

Parameters:

params (Dict[str, Any])

Return type:

Dict[str, Any]

fit(data, **kwargs)[source]

Fit the interrupted time series model and return results.

Parameters:
  • data (pd.DataFrame) – DataFrame containing time series data with ‘date’ column and dependent variable column.

  • **kwargs – Model parameters: - intervention_date (str): Date (YYYY-MM-DD) when intervention occurred. Required. - dependent_variable (str): Column to model (default: “revenue”). - order (tuple): SARIMAX order (p, d, q). - seasonal_order (tuple): SARIMAX seasonal order (P, D, Q, s).

Returns:

Standardized result container (storage handled by manager).

Return type:

ModelResult

Raises:
  • ValueError – If data validation fails or required columns are missing.

  • RuntimeError – If model fitting fails.

validate_data(data)[source]

Validate that the input data meets model requirements.

Parameters:

data (pd.DataFrame) – DataFrame to validate.

Returns:

True if data is valid, False otherwise.

Return type:

bool

get_required_columns()[source]

Get the list of required columns for this model.

Returns:

Column names that must be present in input data.

Return type:

list of str

transform_outbound(data, intervention_date, **kwargs)[source]

Transform impact engine format to SARIMAX model format.

Note: This method is kept for interface compliance but internally uses _prepare_model_input for the actual transformation.

Parameters:
  • data (DataFrame)

  • intervention_date (str)

Return type:

Dict[str, Any]

transform_inbound(model_results)[source]

Transform SARIMAX results to impact engine format.

Note: This method requires transform_outbound to have been called first to set up necessary state. For stateless operation, use _format_results directly with a TransformedInput object.

Parameters:

model_results (Any)

Return type:

Dict[str, Any]

Experiment Adapter

Experiment Model Adapter - thin wrapper around statsmodels OLS with R-style formulas.

class impact_engine_measure.models.experiment.adapter.ExperimentAdapter[source]

Bases: ModelInterface

Estimates treatment effects via OLS regression with R-style formulas.

Constraints: - formula parameter required in MEASUREMENT.PARAMS - DataFrame must contain all variables referenced in the formula

__init__()[source]

Initialize the ExperimentAdapter.

connect(config)[source]

Initialize model with configuration parameters.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (Dict[str, Any])

Return type:

bool

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Return type:

bool

validate_params(params)[source]

Validate experiment-specific parameters.

Parameters:

params (dict) – Parameters dict with formula, etc.

Raises:

ValueError – If formula is missing.

Return type:

None

get_fit_params(params)[source]

Exclude known config keys, pass library kwargs through to statsmodels.

Parameters:

params (Dict[str, Any])

Return type:

Dict[str, Any]

fit(data, **kwargs)[source]

Fit OLS model using statsmodels formula API and return results.

Parameters:
  • data (pd.DataFrame) – DataFrame containing all variables referenced in the formula.

  • **kwargs – Passed through to statsmodels OLS .fit() (e.g., cov_type=’HC3’ for robust standard errors).

Returns:

Standardized result container.

Return type:

ModelResult

Raises:
get_required_columns()[source]

Get required column names.

Returns empty list; statsmodels validates formula variables against the DataFrame natively.

Return type:

List[str]

Metrics Approximation Adapter

Metrics Approximation Adapter - approximates impact from metric changes.

This model approximates treatment impact by correlating metric changes (e.g., quality score improvements) with expected outcome changes via configurable response functions.

class impact_engine_measure.models.metrics_approximation.adapter.MetricsApproximationAdapter[source]

Bases: ModelInterface

Adapter for metrics-based impact approximation that implements ModelInterface.

This model takes enriched products with before/after metric values and baseline outcomes, then applies a response function to approximate the treatment impact.

Input DataFrame must contain:

  • metric_before_column: Pre-intervention metric value

  • metric_after_column: Post-intervention metric value

  • baseline_column: Baseline sales/revenue

Configuration:

MEASUREMENT:
    MODEL: "metrics_approximation"
    METRIC_BEFORE_COLUMN: "quality_before"
    METRIC_AFTER_COLUMN: "quality_after"
    BASELINE_COLUMN: "baseline_sales"
    RESPONSE:
        FUNCTION: "linear"
        PARAMS:
            coefficient: 0.5
__init__()[source]

Initialize the MetricsApproximationAdapter.

connect(config)[source]

Initialize model with configuration parameters.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (dict) – Dictionary containing model configuration: - metric_before_column: Column name for pre-intervention metric - metric_after_column: Column name for post-intervention metric - baseline_column: Column name for baseline outcome - response: Dict with FUNCTION name and optional PARAMS

Returns:

True if initialization successful.

Return type:

bool

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Return type:

bool

validate_params(params)[source]

Validate metrics approximation parameters.

Metrics approximation has no required fit-time parameters beyond what’s configured in connect(). This implementation satisfies the abstract method requirement while allowing all params.

Parameters:

params (dict) – Parameters dict (typically empty for this model).

Return type:

None

get_fit_params(params)[source]

Metrics approximation has no fit-time params from config.

All configuration (column names, response function, response params) is stored in self.config during connect().

Parameters:

params (Dict[str, Any])

Return type:

Dict[str, Any]

fit(data, **kwargs)[source]

Fit the metrics approximation model and return results.

For each product, computes:

delta_metric = metric_after - metric_before approximated_impact = response_function(delta_metric, baseline, row_attributes)

Parameters:
  • data (pd.DataFrame) – DataFrame with enriched products (only treated products). Must contain metric_before, metric_after, and baseline columns. Additional columns are passed as row_attributes to response function.

  • **kwargs – Additional parameters passed to response function.

Returns:

Standardized result container (storage handled by manager).

Return type:

ModelResult

Raises:
validate_data(data)[source]

Validate that the input data meets model requirements.

Parameters:

data (pd.DataFrame) – DataFrame to validate.

Returns:

True if data is valid, False otherwise.

Return type:

bool

get_required_columns()[source]

Get the list of required columns for this model.

Returns:

Column names that must be present in input data.

Return type:

list of str

Nearest Neighbour Matching Adapter

Nearest neighbour matching estimator for treatment effects.

Thin wrapper around causalml’s NearestNeighborMatch. Matches treated and control units on observed covariates, then computes ATT, ATC, and ATE from mean outcome differences in the matched sample. Covariate balance (SMD before/after) is stored as an artifact.

class impact_engine_measure.models.nearest_neighbour_matching.adapter.NearestNeighbourMatchingAdapter[source]

Bases: ModelInterface

Estimates treatment effects via nearest neighbour matching on covariates.

Constraints: - Data must contain a binary treatment column - One or more covariate columns must be specified - treatment_column and covariate_columns are required in MEASUREMENT.PARAMS - When replace=False, only single-column matching is supported (causalml constraint)

__init__()[source]

Initialize the NearestNeighbourMatchingAdapter.

connect(config)[source]

Initialize model with configuration parameters.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (Dict[str, Any])

Return type:

bool

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Return type:

bool

validate_params(params)[source]

Validate nearest-neighbour-matching-specific parameters.

Parameters:

params (dict) – Parameters dict forwarded from config.

Raises:

ValueError – If required parameters are missing.

Return type:

None

get_fit_params(params)[source]

Nearest neighbour matching only uses dependent_variable from fit kwargs.

Parameters:

params (Dict[str, Any])

Return type:

Dict[str, Any]

fit(data, **kwargs)[source]

Fit the nearest neighbour matching model and return results.

Performs two matching passes (ATT and ATC) and computes ATE as the weighted combination.

Parameters:
  • data (pd.DataFrame) – DataFrame with treatment indicator, covariates, and outcome.

  • **kwargs – Filtered MEASUREMENT.PARAMS forwarded by the manager.

Returns:

Standardized result container.

Return type:

ModelResult

Raises:
validate_data(data)[source]

Validate input data meets model requirements.

Parameters:

data (pd.DataFrame) – DataFrame to validate.

Returns:

True if data is valid, False otherwise.

Return type:

bool

get_required_columns()[source]

Get required column names.

Returns:

Column names that must be present in input data.

Return type:

list of str

Subclassification Adapter

Subclassification (stratification) estimator for treatment effects.

This model stratifies observations on covariates using propensity-score quantiles, computes within-stratum treated/control mean differences, and aggregates via weighted average to estimate ATT or ATE.

The underlying “library” is pandas groupby + numpy arithmetic — the algorithm is simple enough that wrapping an external causal-inference package would add dependency weight with no statistical benefit.

class impact_engine_measure.models.subclassification.adapter.SubclassificationAdapter[source]

Bases: ModelInterface

Estimates treatment effects via subclassification on covariates.

Constraints: - Data must contain a binary treatment column - One or more covariate columns must be specified - treatment_column and covariate_columns are required in MEASUREMENT.PARAMS

__init__()[source]

Initialize the SubclassificationAdapter.

connect(config)[source]

Initialize model with configuration parameters.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (Dict[str, Any])

Return type:

bool

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Return type:

bool

validate_params(params)[source]

Validate subclassification-specific parameters.

Parameters:

params (dict) – Parameters dict forwarded from config.

Raises:

ValueError – If required parameters are missing.

Return type:

None

get_fit_params(params)[source]

Subclassification only uses dependent_variable from fit kwargs.

Parameters:

params (Dict[str, Any])

Return type:

Dict[str, Any]

fit(data, **kwargs)[source]

Fit the subclassification model and return results.

Parameters:
  • data (pd.DataFrame) – DataFrame with treatment indicator, covariates, and outcome.

  • **kwargs – All MEASUREMENT.PARAMS forwarded by the manager.

Returns:

Standardized result container.

Return type:

ModelResult

Raises:
validate_data(data)[source]

Validate input data meets model requirements.

Parameters:

data (pd.DataFrame) – DataFrame to validate.

Returns:

True if data is valid, False otherwise.

Return type:

bool

get_required_columns()[source]

Get required column names.

Returns:

Column names that must be present in input data.

Return type:

list of str

Synthetic Control Adapter

Synthetic Control Model Adapter - thin wrapper around pysyncon’s Synth.

class impact_engine_measure.models.synthetic_control.adapter.SyntheticControlAdapter[source]

Bases: ModelInterface

Estimates causal impact using the synthetic control method via pysyncon.

Constraints: - Data must be in panel (long) format with unit, time, outcome, and treatment columns - treatment_time, treated_unit, and outcome_column required in MEASUREMENT.PARAMS - Requires at least one treated unit and one control unit

__init__()[source]

Initialize the SyntheticControlAdapter.

connect(config)[source]

Initialize model with structural configuration parameters.

Config is pre-validated with defaults merged via process_config().

Parameters:

config (Dict[str, Any])

Return type:

bool

validate_connection()[source]

Validate that the model is properly initialized and ready to use.

Return type:

bool

validate_params(params)[source]

Validate synthetic control-specific parameters.

Only validates the three truly required params (null in config_defaults.yaml).

Parameters:

params (dict) – Parameters dict with treatment_time, treated_unit, etc.

Raises:

ValueError – If required parameters are missing.

Return type:

None

get_fit_params(params)[source]

SC accepts treatment_time, treated_unit, columns, and optimizer params.

Parameters:

params (Dict[str, Any])

Return type:

Dict[str, Any]

fit(data, **kwargs)[source]

Fit the synthetic control model and return results.

Parameters:
  • data (pd.DataFrame) – Panel DataFrame with unit, time, outcome, and treatment columns.

  • **kwargs – Model parameters: - treatment_time: When the intervention occurred (index value). Required. - treated_unit (str): Name of the treated unit. Required. - outcome_column (str): Column with the outcome variable. Required. - unit_column (str): Column identifying units (default from config). - time_column (str): Column identifying time periods (default from config). - optim_method (str): Optimization method (default: “Nelder-Mead”). - optim_initial (str): Initial weight strategy (default: “equal”).

Returns:

Standardized result container.

Return type:

ModelResult

Raises:
validate_data(data)[source]

Validate that the input data meets panel data requirements.

Parameters:

data (pd.DataFrame) – DataFrame to validate.

Returns:

True if data is valid, False otherwise.

Return type:

bool

get_required_columns()[source]

Get required column names from config.

Returns:

Column names that must be present in input data.

Return type:

list of str

Storage Layer

Manager

Storage Manager for coordinating storage operations.

Design: Uses dependency injection to receive storage adapter from factory. This decouples coordination logic from adapter selection, enabling: - Easy unit testing with mock adapters - Adapter selection controlled by configuration, not hardcoded

class impact_engine_measure.storage.manager.StorageManager(storage_config, adapter)[source]

Bases: object

Central coordinator for storage management.

Uses dependency injection - the storage adapter is passed in via constructor, making the manager easy to test with mock implementations.

Parameters:
__init__(storage_config, adapter)[source]

Initialize the StorageManager with injected storage adapter.

Parameters:
  • storage_config (dict) – Storage configuration (storage_url, prefix, etc.).

  • adapter (StorageInterface) – The storage implementation to use for persistence.

write_json(path, data)[source]

Write JSON data to storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • data (dict) – Dictionary to serialize as JSON.

Return type:

None

write_csv(path, df)[source]

Write DataFrame to CSV in storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • df (pd.DataFrame) – DataFrame to write.

Return type:

None

write_yaml(path, data)[source]

Write YAML data to storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • data (dict) – Dictionary to serialize as YAML.

Return type:

None

write_parquet(path, df)[source]

Write DataFrame to Parquet in storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • df (pd.DataFrame) – DataFrame to write.

Return type:

None

full_path(path)[source]

Get the full path/URL for a relative path.

Parameters:

path (str) – Relative path within the storage location.

Returns:

Full path or URL to the resource.

Return type:

str

get_current_config()[source]

Get the currently loaded configuration.

Return type:

Dict[str, Any] | None

get_job()[source]

Get the underlying job object for artifact management.

This is used for creating nested jobs (e.g., in metrics adapters).

Returns:

Job object or None if the adapter doesn’t support jobs.

Return type:

Any

Base Interface

Base interfaces and common classes for the storage layer.

class impact_engine_measure.storage.base.StorageInterface[source]

Bases: ABC

Abstract base class defining the contract for all storage implementations.

Required methods (must override):
  • connect: Initialize adapter with configuration

  • write_json: Write JSON data to storage

  • write_csv: Write DataFrame to CSV

  • write_yaml: Write YAML data to storage

  • write_parquet: Write DataFrame to Parquet

  • full_path: Get full path/URL for a relative path

Optional methods (have sensible defaults):
  • validate_connection: Check if connection is active

abstractmethod connect(config)[source]

Initialize storage with configuration.

Parameters:

config (dict) – Dictionary containing storage configuration (e.g., storage_url, prefix).

Returns:

True if initialization successful, False otherwise.

Return type:

bool

abstractmethod write_json(path, data)[source]

Write JSON data to storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • data (dict) – Dictionary to serialize as JSON.

Return type:

None

abstractmethod write_csv(path, df)[source]

Write DataFrame to CSV in storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • df (pd.DataFrame) – DataFrame to write.

Return type:

None

abstractmethod write_yaml(path, data)[source]

Write YAML data to storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • data (dict) – Dictionary to serialize as YAML.

Return type:

None

abstractmethod write_parquet(path, df)[source]

Write DataFrame to Parquet in storage.

Parameters:
  • path (str) – Relative path within the storage location.

  • df (pd.DataFrame) – DataFrame to write.

Return type:

None

abstractmethod full_path(path)[source]

Get the full path/URL for a relative path.

Parameters:

path (str) – Relative path within the storage location.

Returns:

Full path or URL to the resource.

Return type:

str

validate_connection()[source]

Validate that the storage connection is active and functional.

Default implementation returns True. Override for custom validation.

Returns:

True if connection is valid, False otherwise.

Return type:

bool

get_job()[source]

Get the underlying job object for artifact management.

This is used for creating nested jobs or accessing job metadata. Default implementation returns None. Override for adapters that support job-based artifact management.

Returns:

Job object or None if not applicable.

Return type:

Any

Artifact Store Adapter

ArtifactStore Adapter - wraps artifact_store library to StorageInterface.

class impact_engine_measure.storage.artifact_store_adapter.ArtifactStoreAdapter[source]

Bases: StorageInterface

Wraps the artifact_store library to provide a consistent storage interface.

The artifact_store library handles Local and S3 backends internally based on the storage_url format. This adapter provides a uniform interface while delegating actual storage operations to artifact_store.

__init__()[source]

Initialize the ArtifactStoreAdapter.

connect(config)[source]

Initialize storage with configuration.

Parameters:

config (dict) –

Dictionary containing:

  • storage_url: Path or URL (e.g., “./data”, “s3://bucket/prefix”)

  • prefix: Optional job prefix (default: “job-impact-engine”)

  • job_id: Optional job ID for resuming existing jobs or custom IDs. If not provided, a unique ID will be auto-generated.

Returns:

True if initialization successful, False otherwise.

Return type:

bool

write_json(path, data)[source]

Write JSON data to storage.

Parameters:
Return type:

None

write_csv(path, df)[source]

Write DataFrame to CSV in storage.

Parameters:
  • path (str)

  • df (DataFrame)

Return type:

None

write_yaml(path, data)[source]

Write YAML data to storage.

Parameters:
Return type:

None

write_parquet(path, df)[source]

Write DataFrame to Parquet in storage.

Parameters:
  • path (str)

  • df (DataFrame)

Return type:

None

full_path(path)[source]

Get the full path/URL for a relative path.

Parameters:

path (str)

Return type:

str

validate_connection()[source]

Validate that the storage connection is active.

Return type:

bool

get_job()[source]

Get the underlying job object for artifact management.

Return type:

Any