Source code for online_retail_simulator.manage.jobs

"""
Job management functions for simulation data.
"""

import uuid
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

import pandas as pd
from artifact_store import ArtifactStore


[docs] @dataclass class JobInfo: """Information about a simulation job and its storage location.""" job_id: str storage_path: str def __str__(self) -> str: return self.job_id
[docs] def get_store(self) -> ArtifactStore: """Get an ArtifactStore for this job's directory.""" return ArtifactStore(f"{self.storage_path}/{self.job_id}")
[docs] def save_df(self, name: str, df: pd.DataFrame) -> None: """Save a DataFrame to this job's directory.""" store = self.get_store() store.write_csv(f"{name}.csv", df)
[docs] def load_df(self, name: str) -> Optional[pd.DataFrame]: """Load a DataFrame from this job's directory.""" store = self.get_store() file_path = f"{name}.csv" if not store.exists(file_path): return None return store.read_csv(file_path)
def generate_job_id(prefix: str = "job") -> str: """Generate a unique job ID with timestamp and short UUID.""" timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") short_uuid = str(uuid.uuid4())[:8] return f"{prefix}-{timestamp}-{short_uuid}"
[docs] def create_job(config: Dict, config_path: str, job_id: Optional[str] = None) -> JobInfo: """ Create a new job directory with config. Args: config: Configuration dictionary (expects STORAGE.PATH) config_path: Path to original config file job_id: Optional job ID, auto-generated if not provided Returns: JobInfo: Information about the created job """ if job_id is None: prefix = config.get("STORAGE", {}).get("PREFIX", "job") job_id = generate_job_id(prefix) storage_path = config.get("STORAGE", {}).get("PATH", ".") job_info = JobInfo(job_id=job_id, storage_path=storage_path) # Copy original config using ArtifactStore if Path(config_path).exists(): store = job_info.get_store() source_store, filename = ArtifactStore.from_file_path(config_path) config_content = source_store.read_text(filename) store.write_text("config.yaml", config_content) return job_info
def save_job_metadata(job_info: JobInfo, config: Dict, config_path: str, **extra) -> None: """ Save/update job metadata. Args: job_info: JobInfo for the job config: Configuration dictionary config_path: Path to config file **extra: Additional metadata fields (e.g., num_products=10, is_enriched=True) """ store = job_info.get_store() # Load existing metadata or start fresh if store.exists("metadata.json"): metadata = store.read_json("metadata.json") else: metadata = { "job_id": job_info.job_id, "storage_path": job_info.storage_path, "config_path": config_path, } # Merge extra fields and update timestamp metadata.update(extra) metadata["timestamp"] = datetime.now().isoformat() store.write_json("metadata.json", metadata) def save_job_data( products_df: pd.DataFrame, metrics_df: pd.DataFrame, config: Dict, config_path: str, job_id: Optional[str] = None, ) -> JobInfo: """ Save simulation data with automatic job-based storage. Args: products_df: Product characteristics DataFrame metrics_df: Product metrics DataFrame config: Configuration dictionary config_path: Path to original config file job_id: Optional job ID, auto-generated if not provided Returns: JobInfo: Information about the saved job """ job_info = create_job(config, config_path, job_id) job_info.save_df("products", products_df) job_info.save_df("metrics", metrics_df) save_job_metadata( job_info, config, config_path, num_products=len(products_df), num_metrics=len(metrics_df), ) return job_info
[docs] def load_job_results(job_info: JobInfo) -> Dict[str, pd.DataFrame]: """ Load simulation results for a job. Args: job_info: JobInfo containing job details Returns: Dict with available DataFrames: 'products', 'metrics', 'enriched' Raises: FileNotFoundError: If job directory doesn't exist """ store = job_info.get_store() if not store.exists(""): raise FileNotFoundError(f"Job directory not found: {store.base_path}") results = {} for name in [ "products", "product_details_original", "product_details_enriched", "metrics", "enriched", "potential_outcomes", ]: df = job_info.load_df(name) if df is not None: results[name] = df return results
[docs] def load_job_metadata(job_info: JobInfo) -> Dict: """ Load metadata for a job. Args: job_info: JobInfo containing job details Returns: Dict: Job metadata Raises: FileNotFoundError: If job directory or metadata file doesn't exist """ store = job_info.get_store() if not store.exists("metadata.json"): raise FileNotFoundError(f"Metadata file not found: {store.full_path('metadata.json')}") return store.read_json("metadata.json")
[docs] def list_jobs(storage_path: str = ".") -> List[str]: """ List all available job IDs in a storage path. Args: storage_path: Base path where job directories are stored Returns: List of job IDs sorted by creation time (newest first) """ output_dir = Path(storage_path) if not output_dir.exists(): return [] jobs = [] for job_dir in output_dir.iterdir(): if job_dir.is_dir() and job_dir.name.startswith("job-"): jobs.append(job_dir.name) return sorted(jobs, reverse=True)
[docs] def cleanup_old_jobs(storage_path: str = ".", keep_count: int = 10) -> List[str]: """ Clean up old job directories, keeping only the most recent ones. Args: storage_path: Base path where job directories are stored keep_count: Number of recent jobs to keep Returns: List of removed job IDs """ jobs = list_jobs(storage_path) if len(jobs) <= keep_count: return [] jobs_to_remove = jobs[keep_count:] removed_jobs = [] for job_id in jobs_to_remove: job_info = JobInfo(job_id=job_id, storage_path=storage_path) store = job_info.get_store() if store.exists(""): store.delete() removed_jobs.append(job_id) return removed_jobs