API Reference
This section provides auto-generated API documentation from the Impact Engine source code.
Engine
Impact analysis engine for the impact_engine_measure package.
- impact_engine_measure.engine.measure_impact(config_path, storage_url='./data', job_id=None)[source]
Measure causal impact using the configured model and metrics.
Evaluates impact using business metrics retrieved through the metrics layer and models layer for statistical analysis.
- Parameters:
config_path (str) – Path to configuration file containing metrics and model settings. The config must include DATA.SOURCE.CONFIG.PATH pointing to a products CSV file.
storage_url (str) – Storage URL or path (e.g., “./data”, “s3://bucket/prefix”).
job_id (str, optional) – Job ID for resuming existing jobs or using custom IDs. If not provided, a unique ID will be auto-generated.
- Returns:
Job object for the completed run. Use
load_results(job_info)to load all artifacts into a typedMeasureJobResult.- Return type:
JobInfo
Results
Load and access job results produced by evaluate_impact().
- class impact_engine_measure.results.MeasureJobResult(job_id, model_type, created_at, config, impact_results, products, business_metrics, transformed_metrics, model_artifacts)[source]
Bases:
objectTyped container for all artifacts produced by a single pipeline run.
- Parameters:
- job_id
- Type:
Unique identifier for the job.
- model_type
- Type:
Model identifier (e.g.
"interrupted_time_series").
- created_at
- Type:
ISO-8601 timestamp of job creation.
- config
- Type:
The YAML configuration used for this run.
- impact_results
- Type:
The
impact_results.jsonenvelope (model_type, data, metadata).
- products
- Type:
Product catalog DataFrame.
- business_metrics
- Type:
Raw business metrics DataFrame.
- transformed_metrics
- Type:
Transformed metrics DataFrame.
- model_artifacts
artifact name with the
{model_type}__prefix stripped.- Type:
Model-specific supplementary DataFrames, keyed by
- __init__(job_id, model_type, created_at, config, impact_results, products, business_metrics, transformed_metrics, model_artifacts)
- impact_engine_measure.results.load_results(job_info)[source]
Load all artifacts from a completed pipeline run.
Reads
manifest.jsonto discover files, then loads each one using the format-appropriate reader. Model-specific artifacts (those not in the fixed pipeline set) are collected intomodel_artifactswith the{model_type}__prefix stripped from their keys.- Parameters:
job_info (JobInfo) –
JobInforeturned byevaluate_impact().- Returns:
Typed container with every artifact.
- Return type:
- Raises:
FileNotFoundError – If the job directory or manifest is missing.
ValueError – If the manifest’s major schema version is incompatible.
Metrics Layer
Manager
Metrics Manager for coordinating metrics operations.
Design: Uses dependency injection to receive metrics adapter from factory. This decouples coordination logic from adapter selection, enabling: - Easy unit testing with mock adapters - Adapter selection controlled by configuration, not hardcoded
- class impact_engine_measure.metrics.manager.MetricsManager(source_config, metrics_source, source_type, parent_job=None)[source]
Bases:
objectCentral coordinator for metrics management.
Uses dependency injection - the metrics source is passed in via constructor, making the manager easy to test with mock implementations.
Note: source_config is expected to be pre-validated via process_config().
- Parameters:
metrics_source (MetricsInterface)
source_type (str)
parent_job (JobInfo | None)
- __init__(source_config, metrics_source, source_type, parent_job=None)[source]
Initialize the MetricsManager with injected metrics source.
- Parameters:
source_config (dict) – SOURCE.CONFIG configuration block (pre-validated, with defaults merged).
metrics_source (MetricsInterface) – The metrics implementation to use for data retrieval.
source_type (str) – The type of metrics source (e.g., “simulator”, “file”).
parent_job (JobInfo, optional) – Optional parent job for artifact management.
Base Interface
Base interfaces and common classes for the metrics layer.
- class impact_engine_measure.metrics.base.MetricsInterface[source]
Bases:
ABCAbstract base class defining the contract for all metrics implementations.
- Required methods (must override):
connect: Initialize adapter with configuration
retrieve_business_metrics: Fetch metrics data
- Optional methods (have sensible defaults):
validate_connection: Check if connection is active
transform_outbound: Transform data to external format
transform_inbound: Transform data from external format
- abstractmethod retrieve_business_metrics(products, start_date, end_date)[source]
Retrieve business metrics for specified products and time range.
- validate_connection()[source]
Validate that the metrics source connection is active and functional.
Default implementation returns True. Override for custom validation.
- Returns:
True if connection is valid, False otherwise.
- Return type:
- transform_outbound(products, start_date, end_date)[source]
Transform impact engine format to external system format.
Default implementation is pass-through. Override for adapters that need data transformation.
- transform_inbound(external_data)[source]
Transform external system response to impact engine format.
Default implementation returns data as-is if DataFrame, otherwise raises. Override for adapters that need result transformation.
- Parameters:
external_data (Any) – Raw data from the external system.
- Returns:
DataFrame with standardized business metrics format.
- Return type:
pd.DataFrame
Catalog Simulator Adapter
Catalog Simulator Adapter - adapts online_retail_simulator package to MetricsInterface.
Integration is governed by contracts (schemas) and config bridge (config translation).
- class impact_engine_measure.metrics.catalog_simulator.adapter.CatalogSimulatorAdapter[source]
Bases:
MetricsInterfaceAdapter for catalog simulator that implements MetricsInterface.
- connect(config)[source]
Establish connection to the catalog simulator.
Config is pre-validated with defaults merged via process_config().
- retrieve_business_metrics(products, start_date, end_date)[source]
Retrieve business metrics using catalog simulator’s job-aware API.
- validate_connection()[source]
Validate that the catalog simulator connection is active and functional.
- Return type:
File Adapter
File Adapter - reads metrics data from CSV or Parquet files.
This adapter enables file-based workflows where upstream processes produce data files that impact-engine consumes.
- class impact_engine_measure.metrics.file.adapter.FileAdapter[source]
Bases:
MetricsInterfaceAdapter for file-based data sources that implements MetricsInterface.
Supports CSV and Parquet file formats, including partitioned Parquet directories. The file is expected to contain pre-processed data ready for impact analysis.
Configuration:
DATA: SOURCE: type: file CONFIG: path: path/to/data.csv # Single CSV file path: path/to/data.parquet # Single Parquet file path: path/to/partitioned_data/ # Partitioned Parquet directory # Optional parameters: date_column: date # Column name for date filtering product_id_column: product_id # Column name for product IDs
- connect(config)[source]
Initialize adapter with configuration parameters.
- Parameters:
config (dict) – Dictionary containing (lowercase keys, merged via process_config): - path: Path to the data file (required) - date_column: Column name for dates (optional) - product_id_column: Column name for product IDs (optional, default: product_id)
- Returns:
True if initialization successful.
- Return type:
- Raises:
ValueError – If required configuration is missing.
FileNotFoundError – If the specified file doesn’t exist.
- retrieve_business_metrics(products, start_date, end_date)[source]
Retrieve business metrics from the loaded file.
For file-based sources, the data is already loaded. This method optionally filters by date range and product IDs if configured.
- Parameters:
- Returns:
DataFrame with business metrics.
- Return type:
pd.DataFrame
- Raises:
ConnectionError – If adapter not connected.
- validate_connection()[source]
Validate that the file source is accessible.
- Returns:
True if file exists and data is loaded.
- Return type:
- transform_outbound(products, start_date, end_date)[source]
Transform impact engine format to file adapter format.
For file-based sources, this is a pass-through since the file already contains the data in the expected format.
- transform_inbound(external_data)[source]
Transform file data to impact engine format.
For file-based sources, this adds metadata fields and ensures proper column naming.
- Parameters:
external_data (Any) – DataFrame read from file.
- Returns:
DataFrame with standardized format.
- Return type:
pd.DataFrame
Models Layer
Manager
Models manager for coordinating model operations.
- class impact_engine_measure.models.manager.FitOutput(results_path, artifact_paths=<factory>, model_type='')[source]
Bases:
objectStructured output from fit_model().
Provides programmatic access to the results path and all artifact paths, so callers do not need to reconstruct file paths from model internals.
- results_path
- Type:
Full path/URL to impact_results.json.
- artifact_paths
- Type:
Mapping of artifact name to full path/URL.
- model_type
- Type:
The model type that produced this output.
- class impact_engine_measure.models.manager.ModelsManager(measurement_config, model)[source]
Bases:
objectCentral coordinator for model management.
Uses dependency injection - the model is passed in via constructor, making the manager easy to test with mock implementations.
Note: measurement_config is expected to be pre-validated via process_config().
- Parameters:
model (ModelInterface)
- __init__(measurement_config, model)[source]
Initialize the ModelsManager with injected model.
- Parameters:
measurement_config (dict) – MEASUREMENT configuration block (pre-validated, with defaults merged).
model (ModelInterface) – The model implementation to use for fitting.
- fit_model(data, storage=None, **overrides)[source]
Fit model using configuration parameters.
All PARAMS from config are forwarded as kwargs to validate_params() and fit(). Callers can override any config param via
**overrides.- Parameters:
data (pd.DataFrame) – DataFrame containing data for model fitting.
storage (StorageManager) – Storage backend for artifacts.
**overrides – Override any MEASUREMENT.PARAMS value (e.g., intervention_date, dependent_variable).
- Returns:
FitOutput with paths to all persisted files.
- Return type:
Base Interface
Base interface for impact models.
- class impact_engine_measure.models.base.ModelResult(model_type, data, metadata=<factory>, artifacts=<factory>)[source]
Bases:
objectStandardized model result container.
All models return this structure, allowing the manager to handle storage uniformly while models remain storage-agnostic.
- The
datadict must use three standardized keys: model_params: Input parameters used (formula, intervention_date, etc.)
impact_estimates: The treatment effect measurements
model_summary: Fit diagnostics, sample sizes, configuration echo
- Parameters:
- model_type
- Type:
Identifier for the model that produced this result.
- data
- Type:
Primary result data with keys: model_params, impact_estimates, model_summary.
- metadata
- Type:
Metadata about the model run (populated by the manager).
- artifacts
Keys are format-agnostic names; the manager prefixes with model_type and appends the file extension.
- Type:
Supplementary DataFrames to persist (e.g., per-product details).
- The
- class impact_engine_measure.models.base.ModelInterface[source]
Bases:
ABCAbstract base class for impact models.
Defines the unified interface that all impact models must implement. This ensures consistent behavior across different modeling approaches (interrupted time series, causal inference, metrics approximation, etc.).
- Required methods (must override):
connect: Initialize model with configuration
fit: Fit model to data
validate_params: Validate model-specific parameters before fitting
- Optional methods (have sensible defaults):
validate_connection: Check if model is ready
validate_data: Check if input data is valid
get_required_columns: Return list of required columns
transform_outbound: Transform data to external format
transform_inbound: Transform results from external format
- abstractmethod fit(data, **kwargs)[source]
Fit the model to the provided data.
- Parameters:
data (pd.DataFrame) – DataFrame containing data for model fitting.
**kwargs – Model-specific parameters (e.g., intervention_date, dependent_variable).
- Returns:
Model-specific results (Dict, str path, etc.)
- Return type:
Any
- Raises:
ValueError – If data validation fails or required columns are missing.
RuntimeError – If model fitting fails.
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
Default implementation returns True. Override for custom validation.
- Returns:
True if model is ready, False otherwise.
- Return type:
- validate_data(data)[source]
Validate that the input data meets model requirements.
Default implementation checks if data is non-empty. Override for custom validation.
- Parameters:
data (pd.DataFrame) – DataFrame to validate.
- Returns:
True if data is valid, False otherwise.
- Return type:
- get_required_columns()[source]
Get the list of required columns for this model.
Default implementation returns empty list. Override if model requires specific columns.
- abstractmethod validate_params(params)[source]
Validate model-specific parameters before fitting.
This method is called by ModelsManager before fit() to perform early validation of required parameters. All model implementations MUST override this method to validate their specific parameters.
Centralized config validation (process_config) handles known models, but this method ensures custom/user-defined models also validate.
- Parameters:
params (dict) – Dictionary containing parameters that will be passed to fit(). Typical keys: intervention_date, dependent_variable.
- Raises:
ValueError – If required parameters are missing or invalid.
- Return type:
None
- get_fit_params(params)[source]
Filter parameters to only those accepted by this adapter’s fit().
Called by ModelsManager before fit() to prevent cross-model param pollution. Default returns all params (backward compatible). Built-in adapters override.
- transform_outbound(data, **kwargs)[source]
Transform impact engine format to model library format.
Default implementation is pass-through. Override for models that need data transformation.
- Parameters:
data (pd.DataFrame) – DataFrame with impact engine standardized format.
**kwargs – Additional model-specific parameters.
- Returns:
Dictionary with parameters formatted for the model library.
- Return type:
- transform_inbound(model_results)[source]
Transform model library results to impact engine format.
Default implementation returns results as-is (or wrapped in dict). Override for models that need result transformation.
- Parameters:
model_results (Any) – Raw results from the model library.
- Returns:
Dictionary with standardized impact analysis results.
- Return type:
Interrupted Time Series Adapter
Interrupted Time Series Model Adapter - adapts SARIMAX to ModelInterface.
- class impact_engine_measure.models.interrupted_time_series.adapter.TransformedInput(y, exog, data, dependent_variable, intervention_date, order, seasonal_order)[source]
Bases:
objectContainer for transformed model input data.
This dataclass eliminates hidden state by explicitly passing all necessary data between transformation and result formatting.
- Parameters:
- y: ndarray
- exog: DataFrame
- data: DataFrame
- class impact_engine_measure.models.interrupted_time_series.adapter.InterruptedTimeSeriesAdapter[source]
Bases:
ModelInterfaceEstimates causal impact of an intervention using time series analysis.
Constraints: - Data must be ordered chronologically with a ‘date’ column - intervention_date parameter required in MEASUREMENT.PARAMS - Requires sufficient pre and post-intervention observations (minimum 3 total)
- connect(config)[source]
Initialize model with configuration parameters.
Config is pre-validated with defaults merged via process_config().
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
- Return type:
- validate_params(params)[source]
Validate ITS-specific parameters.
- Parameters:
params (dict) – Parameters dict with intervention_date, dependent_variable, etc.
- Raises:
ValueError – If intervention_date is missing.
- Return type:
None
- get_fit_params(params)[source]
ITS accepts intervention_date, dependent_variable, order, seasonal_order.
- fit(data, **kwargs)[source]
Fit the interrupted time series model and return results.
- Parameters:
data (pd.DataFrame) – DataFrame containing time series data with ‘date’ column and dependent variable column.
**kwargs – Model parameters: - intervention_date (str): Date (YYYY-MM-DD) when intervention occurred. Required. - dependent_variable (str): Column to model (default: “revenue”). - order (tuple): SARIMAX order (p, d, q). - seasonal_order (tuple): SARIMAX seasonal order (P, D, Q, s).
- Returns:
Standardized result container (storage handled by manager).
- Return type:
- Raises:
ValueError – If data validation fails or required columns are missing.
RuntimeError – If model fitting fails.
- validate_data(data)[source]
Validate that the input data meets model requirements.
- Parameters:
data (pd.DataFrame) – DataFrame to validate.
- Returns:
True if data is valid, False otherwise.
- Return type:
- transform_outbound(data, intervention_date, **kwargs)[source]
Transform impact engine format to SARIMAX model format.
Note: This method is kept for interface compliance but internally uses _prepare_model_input for the actual transformation.
Experiment Adapter
Experiment Model Adapter - thin wrapper around statsmodels OLS with R-style formulas.
- class impact_engine_measure.models.experiment.adapter.ExperimentAdapter[source]
Bases:
ModelInterfaceEstimates treatment effects via OLS regression with R-style formulas.
Constraints: - formula parameter required in MEASUREMENT.PARAMS - DataFrame must contain all variables referenced in the formula
- connect(config)[source]
Initialize model with configuration parameters.
Config is pre-validated with defaults merged via process_config().
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
- Return type:
- validate_params(params)[source]
Validate experiment-specific parameters.
- Parameters:
params (dict) – Parameters dict with formula, etc.
- Raises:
ValueError – If formula is missing.
- Return type:
None
- get_fit_params(params)[source]
Exclude known config keys, pass library kwargs through to statsmodels.
- fit(data, **kwargs)[source]
Fit OLS model using statsmodels formula API and return results.
- Parameters:
data (pd.DataFrame) – DataFrame containing all variables referenced in the formula.
**kwargs – Passed through to statsmodels OLS .fit() (e.g., cov_type=’HC3’ for robust standard errors).
- Returns:
Standardized result container.
- Return type:
- Raises:
ConnectionError – If model not connected.
RuntimeError – If model fitting fails.
Metrics Approximation Adapter
Metrics Approximation Adapter - approximates impact from metric changes.
This model approximates treatment impact by correlating metric changes (e.g., quality score improvements) with expected outcome changes via configurable response functions.
- class impact_engine_measure.models.metrics_approximation.adapter.MetricsApproximationAdapter[source]
Bases:
ModelInterfaceAdapter for metrics-based impact approximation that implements ModelInterface.
This model takes enriched products with before/after metric values and baseline outcomes, then applies a response function to approximate the treatment impact.
Input DataFrame must contain:
metric_before_column: Pre-intervention metric value
metric_after_column: Post-intervention metric value
baseline_column: Baseline sales/revenue
Configuration:
MEASUREMENT: MODEL: "metrics_approximation" METRIC_BEFORE_COLUMN: "quality_before" METRIC_AFTER_COLUMN: "quality_after" BASELINE_COLUMN: "baseline_sales" RESPONSE: FUNCTION: "linear" PARAMS: coefficient: 0.5
- connect(config)[source]
Initialize model with configuration parameters.
Config is pre-validated with defaults merged via process_config().
- Parameters:
config (dict) – Dictionary containing model configuration: - metric_before_column: Column name for pre-intervention metric - metric_after_column: Column name for post-intervention metric - baseline_column: Column name for baseline outcome - response: Dict with FUNCTION name and optional PARAMS
- Returns:
True if initialization successful.
- Return type:
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
- Return type:
- validate_params(params)[source]
Validate metrics approximation parameters.
Metrics approximation has no required fit-time parameters beyond what’s configured in connect(). This implementation satisfies the abstract method requirement while allowing all params.
- Parameters:
params (dict) – Parameters dict (typically empty for this model).
- Return type:
None
- get_fit_params(params)[source]
Metrics approximation has no fit-time params from config.
All configuration (column names, response function, response params) is stored in self.config during connect().
- fit(data, **kwargs)[source]
Fit the metrics approximation model and return results.
- For each product, computes:
delta_metric = metric_after - metric_before approximated_impact = response_function(delta_metric, baseline, row_attributes)
- Parameters:
data (pd.DataFrame) – DataFrame with enriched products (only treated products). Must contain metric_before, metric_after, and baseline columns. Additional columns are passed as row_attributes to response function.
**kwargs – Additional parameters passed to response function.
- Returns:
Standardized result container (storage handled by manager).
- Return type:
- Raises:
ConnectionError – If model not connected.
ValueError – If data validation fails.
Nearest Neighbour Matching Adapter
Nearest neighbour matching estimator for treatment effects.
Thin wrapper around causalml’s NearestNeighborMatch. Matches treated and control units on observed covariates, then computes ATT, ATC, and ATE from mean outcome differences in the matched sample. Covariate balance (SMD before/after) is stored as an artifact.
- class impact_engine_measure.models.nearest_neighbour_matching.adapter.NearestNeighbourMatchingAdapter[source]
Bases:
ModelInterfaceEstimates treatment effects via nearest neighbour matching on covariates.
Constraints: - Data must contain a binary treatment column - One or more covariate columns must be specified - treatment_column and covariate_columns are required in MEASUREMENT.PARAMS - When replace=False, only single-column matching is supported (causalml constraint)
- connect(config)[source]
Initialize model with configuration parameters.
Config is pre-validated with defaults merged via process_config().
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
- Return type:
- validate_params(params)[source]
Validate nearest-neighbour-matching-specific parameters.
- Parameters:
params (dict) – Parameters dict forwarded from config.
- Raises:
ValueError – If required parameters are missing.
- Return type:
None
- get_fit_params(params)[source]
Nearest neighbour matching only uses dependent_variable from fit kwargs.
- fit(data, **kwargs)[source]
Fit the nearest neighbour matching model and return results.
Performs two matching passes (ATT and ATC) and computes ATE as the weighted combination.
- Parameters:
data (pd.DataFrame) – DataFrame with treatment indicator, covariates, and outcome.
**kwargs – Filtered MEASUREMENT.PARAMS forwarded by the manager.
- Returns:
Standardized result container.
- Return type:
- Raises:
ConnectionError – If model not connected.
ValueError – If data validation fails.
RuntimeError – If model fitting fails.
Subclassification Adapter
Subclassification (stratification) estimator for treatment effects.
This model stratifies observations on covariates using propensity-score quantiles, computes within-stratum treated/control mean differences, and aggregates via weighted average to estimate ATT or ATE.
The underlying “library” is pandas groupby + numpy arithmetic — the algorithm is simple enough that wrapping an external causal-inference package would add dependency weight with no statistical benefit.
- class impact_engine_measure.models.subclassification.adapter.SubclassificationAdapter[source]
Bases:
ModelInterfaceEstimates treatment effects via subclassification on covariates.
Constraints: - Data must contain a binary treatment column - One or more covariate columns must be specified - treatment_column and covariate_columns are required in MEASUREMENT.PARAMS
- connect(config)[source]
Initialize model with configuration parameters.
Config is pre-validated with defaults merged via process_config().
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
- Return type:
- validate_params(params)[source]
Validate subclassification-specific parameters.
- Parameters:
params (dict) – Parameters dict forwarded from config.
- Raises:
ValueError – If required parameters are missing.
- Return type:
None
- fit(data, **kwargs)[source]
Fit the subclassification model and return results.
- Parameters:
data (pd.DataFrame) – DataFrame with treatment indicator, covariates, and outcome.
**kwargs – All MEASUREMENT.PARAMS forwarded by the manager.
- Returns:
Standardized result container.
- Return type:
- Raises:
ConnectionError – If model not connected.
ValueError – If data validation fails.
RuntimeError – If model fitting fails.
Synthetic Control Adapter
Synthetic Control Model Adapter - thin wrapper around pysyncon’s Synth.
- class impact_engine_measure.models.synthetic_control.adapter.SyntheticControlAdapter[source]
Bases:
ModelInterfaceEstimates causal impact using the synthetic control method via pysyncon.
Constraints: - Data must be in panel (long) format with unit, time, outcome, and treatment columns - treatment_time, treated_unit, and outcome_column required in MEASUREMENT.PARAMS - Requires at least one treated unit and one control unit
- connect(config)[source]
Initialize model with structural configuration parameters.
Config is pre-validated with defaults merged via process_config().
- validate_connection()[source]
Validate that the model is properly initialized and ready to use.
- Return type:
- validate_params(params)[source]
Validate synthetic control-specific parameters.
Only validates the three truly required params (null in config_defaults.yaml).
- Parameters:
params (dict) – Parameters dict with treatment_time, treated_unit, etc.
- Raises:
ValueError – If required parameters are missing.
- Return type:
None
- get_fit_params(params)[source]
SC accepts treatment_time, treated_unit, columns, and optimizer params.
- fit(data, **kwargs)[source]
Fit the synthetic control model and return results.
- Parameters:
data (pd.DataFrame) – Panel DataFrame with unit, time, outcome, and treatment columns.
**kwargs – Model parameters: - treatment_time: When the intervention occurred (index value). Required. - treated_unit (str): Name of the treated unit. Required. - outcome_column (str): Column with the outcome variable. Required. - unit_column (str): Column identifying units (default from config). - time_column (str): Column identifying time periods (default from config). - optim_method (str): Optimization method (default: “Nelder-Mead”). - optim_initial (str): Initial weight strategy (default: “equal”).
- Returns:
Standardized result container.
- Return type:
- Raises:
ConnectionError – If model not connected.
ValueError – If data validation fails.
RuntimeError – If model fitting fails.
Storage Layer
Manager
Storage Manager for coordinating storage operations.
Design: Uses dependency injection to receive storage adapter from factory. This decouples coordination logic from adapter selection, enabling: - Easy unit testing with mock adapters - Adapter selection controlled by configuration, not hardcoded
- class impact_engine_measure.storage.manager.StorageManager(storage_config, adapter)[source]
Bases:
objectCentral coordinator for storage management.
Uses dependency injection - the storage adapter is passed in via constructor, making the manager easy to test with mock implementations.
- Parameters:
adapter (StorageInterface)
- __init__(storage_config, adapter)[source]
Initialize the StorageManager with injected storage adapter.
- Parameters:
storage_config (dict) – Storage configuration (storage_url, prefix, etc.).
adapter (StorageInterface) – The storage implementation to use for persistence.
- write_csv(path, df)[source]
Write DataFrame to CSV in storage.
- Parameters:
path (str) – Relative path within the storage location.
df (pd.DataFrame) – DataFrame to write.
- Return type:
None
- write_parquet(path, df)[source]
Write DataFrame to Parquet in storage.
- Parameters:
path (str) – Relative path within the storage location.
df (pd.DataFrame) – DataFrame to write.
- Return type:
None
Base Interface
Base interfaces and common classes for the storage layer.
- class impact_engine_measure.storage.base.StorageInterface[source]
Bases:
ABCAbstract base class defining the contract for all storage implementations.
- Required methods (must override):
connect: Initialize adapter with configuration
write_json: Write JSON data to storage
write_csv: Write DataFrame to CSV
write_yaml: Write YAML data to storage
write_parquet: Write DataFrame to Parquet
full_path: Get full path/URL for a relative path
- Optional methods (have sensible defaults):
validate_connection: Check if connection is active
- abstractmethod write_csv(path, df)[source]
Write DataFrame to CSV in storage.
- Parameters:
path (str) – Relative path within the storage location.
df (pd.DataFrame) – DataFrame to write.
- Return type:
None
- abstractmethod write_parquet(path, df)[source]
Write DataFrame to Parquet in storage.
- Parameters:
path (str) – Relative path within the storage location.
df (pd.DataFrame) – DataFrame to write.
- Return type:
None
- validate_connection()[source]
Validate that the storage connection is active and functional.
Default implementation returns True. Override for custom validation.
- Returns:
True if connection is valid, False otherwise.
- Return type:
- get_job()[source]
Get the underlying job object for artifact management.
This is used for creating nested jobs or accessing job metadata. Default implementation returns None. Override for adapters that support job-based artifact management.
- Returns:
Job object or None if not applicable.
- Return type:
Any
Artifact Store Adapter
ArtifactStore Adapter - wraps artifact_store library to StorageInterface.
- class impact_engine_measure.storage.artifact_store_adapter.ArtifactStoreAdapter[source]
Bases:
StorageInterfaceWraps the artifact_store library to provide a consistent storage interface.
The artifact_store library handles Local and S3 backends internally based on the storage_url format. This adapter provides a uniform interface while delegating actual storage operations to artifact_store.
- connect(config)[source]
Initialize storage with configuration.
- Parameters:
config (dict) –
Dictionary containing:
storage_url: Path or URL (e.g., “./data”, “s3://bucket/prefix”)
prefix: Optional job prefix (default: “job-impact-engine”)
job_id: Optional job ID for resuming existing jobs or custom IDs. If not provided, a unique ID will be auto-generated.
- Returns:
True if initialization successful, False otherwise.
- Return type:
- write_csv(path, df)[source]
Write DataFrame to CSV in storage.
- Parameters:
path (str)
df (DataFrame)
- Return type:
None
- write_parquet(path, df)[source]
Write DataFrame to Parquet in storage.
- Parameters:
path (str)
df (DataFrame)
- Return type:
None