"""Configuration processing with defaults and validation."""
import copy
from pathlib import Path
from typing import Any, Dict
from artifact_store import ArtifactStore
def _extract_param_schemas_from_defaults() -> Dict[str, Any]:
"""Extract parameter schemas from config defaults."""
defaults = load_defaults()
schemas = {}
for backend in ["RULE", "SYNTHESIZER"]:
if backend in defaults:
schemas[backend] = {}
backend_config = defaults[backend]
for section in ["CHARACTERISTICS", "METRICS"]:
if section in backend_config:
schemas[backend][section] = {}
section_config = backend_config[section]
function_name = section_config.get("FUNCTION")
params = section_config.get("PARAMS", {})
if function_name and params:
schemas[backend][section][function_name] = set(params.keys())
return schemas
def _get_param_schemas() -> Dict[str, Any]:
"""Get parameter schemas, cached for performance."""
if not hasattr(_get_param_schemas, "_cache"):
_get_param_schemas._cache = _extract_param_schemas_from_defaults()
return _get_param_schemas._cache
[docs]
def load_defaults() -> Dict[str, Any]:
"""Load default configuration from package."""
defaults_path = str(Path(__file__).parent / "config_defaults.yaml")
store, filename = ArtifactStore.from_file_path(defaults_path)
return store.read_yaml(filename)
[docs]
def get_impact_defaults(function_name: str) -> Dict[str, Any]:
"""
Get default parameters for an IMPACT enrichment function.
Args:
function_name: Name of the enrichment function (e.g., "product_detail_boost")
Returns:
Dictionary of default parameters for the function, or empty dict if not found
"""
defaults = load_defaults()
impact_defaults = defaults.get("IMPACT", {})
return copy.deepcopy(impact_defaults.get(function_name, {}))
[docs]
def deep_merge(base: Dict, override: Dict) -> Dict:
"""
Deep merge two dictionaries, with override values taking precedence.
Args:
base: Base dictionary (defaults)
override: Override dictionary (user config)
Returns:
Merged dictionary
"""
result = copy.deepcopy(base)
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = copy.deepcopy(value)
return result
def _require(config: Dict[str, Any], path: str, message: str) -> None:
parts = path.split(".")
cur: Any = config
for part in parts:
if not isinstance(cur, dict) or part not in cur or cur[part] in (None, ""):
raise ValueError(message)
cur = cur[part]
def _validate_params(backend: str, section: str, function_name: str, params: Dict[str, Any]) -> None:
"""Validate parameters against expected schema.
For built-in functions defined in config_defaults.yaml, this performs strict
parameter validation to catch configuration errors early (typos, missing params, etc.).
For custom functions registered via register_characteristics_function() or
register_metrics_function(), validation is skipped since their parameter
schemas are not known at config processing time.
"""
schemas = _get_param_schemas()
if backend not in schemas:
raise ValueError(f"Unknown backend: {backend}")
if section not in schemas[backend]:
raise ValueError(f"Unknown section: {section} for backend {backend}")
if function_name not in schemas[backend][section]:
# Custom function - skip validation, trust the registry and runtime
return
expected_params = schemas[backend][section][function_name]
provided_params = set(params.keys())
# Check for unexpected parameters
extra_params = provided_params - expected_params
if extra_params:
raise ValueError(
f"Unexpected parameters for {backend}.{section}.{function_name}: "
f"{sorted(extra_params)}. Expected: {sorted(expected_params)}"
)
# Check for missing required parameters (after defaults merge)
missing_params = expected_params - provided_params
if missing_params:
raise ValueError(
f"Missing required parameters for {backend}.{section}.{function_name}: {sorted(missing_params)}"
)
# Check for null values that must be user-provided (only for specific params)
required_non_null_params = {"training_data_path"} # Only these cannot be null
for param, value in params.items():
if value is None and param in required_non_null_params:
raise ValueError(
f"Parameter '{param}' for {backend}.{section}.{function_name} must be provided by user (cannot be null)"
)
[docs]
def validate_config(config: Dict[str, Any]) -> None:
"""Validate configuration has required fields and valid parameters."""
# STORAGE is optional - if present, PATH is required
if "STORAGE" in config:
_require(
config,
"STORAGE.PATH",
"Configuration with STORAGE must include STORAGE.PATH",
)
# Validate that exactly one of RULE or SYNTHESIZER is present
has_rule = "RULE" in config
has_synthesizer = "SYNTHESIZER" in config
if has_rule and has_synthesizer:
raise ValueError("Config must contain exactly one of RULE or SYNTHESIZER block, not both")
elif not has_rule and not has_synthesizer:
raise ValueError("Config must contain either RULE or SYNTHESIZER block")
# Validate RULE backend
if has_rule:
rule_config = config["RULE"]
# Validate CHARACTERISTICS
if "CHARACTERISTICS" in rule_config:
char_config = rule_config["CHARACTERISTICS"]
function_name = char_config.get("FUNCTION", "default")
params = char_config.get("PARAMS", {})
_validate_params("RULE", "CHARACTERISTICS", function_name, params)
# Validate METRICS
if "METRICS" in rule_config:
metrics_config = rule_config["METRICS"]
function_name = metrics_config.get("FUNCTION", "default")
params = metrics_config.get("PARAMS", {})
_validate_params("RULE", "METRICS", function_name, params)
# Validate SYNTHESIZER backend
if has_synthesizer:
syn_config = config["SYNTHESIZER"]
# Validate CHARACTERISTICS
if "CHARACTERISTICS" in syn_config:
char_config = syn_config["CHARACTERISTICS"]
function_name = char_config.get("FUNCTION")
if not function_name:
raise ValueError("SYNTHESIZER.CHARACTERISTICS.FUNCTION is required")
params = char_config.get("PARAMS", {})
_validate_params("SYNTHESIZER", "CHARACTERISTICS", function_name, params)
# Validate METRICS
if "METRICS" in syn_config:
metrics_config = syn_config["METRICS"]
function_name = metrics_config.get("FUNCTION")
if not function_name:
raise ValueError("SYNTHESIZER.METRICS.FUNCTION is required")
params = metrics_config.get("PARAMS", {})
_validate_params("SYNTHESIZER", "METRICS", function_name, params)
[docs]
def process_config(config_path: str) -> Dict[str, Any]:
"""
Load, merge with defaults, and validate configuration.
Args:
config_path: Path to user configuration file (local or S3)
Returns:
Complete validated configuration
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If configuration is invalid
"""
store, filename = ArtifactStore.from_file_path(config_path)
if not store.exists(filename):
raise FileNotFoundError(f"Configuration file not found: {config_path}")
# Load user config - support both YAML and JSON for backward compatibility
if filename.lower().endswith((".yaml", ".yml")):
user_config = store.read_yaml(filename)
else:
user_config = store.read_json(filename)
# Load defaults
defaults = load_defaults()
# Remove conflicting blocks from defaults based on user config
# User config determines which backend to use
if "SYNTHESIZER" in user_config:
# User wants SYNTHESIZER, remove RULE from defaults
if "RULE" in defaults:
defaults = copy.deepcopy(defaults)
del defaults["RULE"]
elif "RULE" in user_config:
# User wants RULE, remove SYNTHESIZER from defaults
if "SYNTHESIZER" in defaults:
defaults = copy.deepcopy(defaults)
del defaults["SYNTHESIZER"]
else:
# User didn't specify backend, default to RULE
if "SYNTHESIZER" in defaults:
defaults = copy.deepcopy(defaults)
del defaults["SYNTHESIZER"]
# Merge user config over defaults
config = deep_merge(defaults, user_config)
validate_config(config)
return config