"""ArtifactStore Adapter - wraps artifact_store library to StorageInterface."""
from typing import Any, Dict
import pandas as pd
from artifact_store import create_job
from .base import StorageInterface
from .factory import STORAGE_REGISTRY
[docs]
@STORAGE_REGISTRY.register_decorator("artifact_store")
class ArtifactStoreAdapter(StorageInterface):
"""Wraps the artifact_store library to provide a consistent storage interface.
The artifact_store library handles Local and S3 backends internally based on
the storage_url format. This adapter provides a uniform interface while
delegating actual storage operations to artifact_store.
"""
[docs]
def __init__(self):
"""Initialize the ArtifactStoreAdapter."""
self.job = None
self.store = None
self.is_connected = False
[docs]
def connect(self, config: Dict[str, Any]) -> bool:
"""Initialize storage with configuration.
Parameters
----------
config : dict
Dictionary containing:
- storage_url: Path or URL (e.g., "./data", "s3://bucket/prefix")
- prefix: Optional job prefix (default: "job-impact-engine")
- job_id: Optional job ID for resuming existing jobs or custom IDs.
If not provided, a unique ID will be auto-generated.
Returns
-------
bool
True if initialization successful, False otherwise.
"""
storage_url = config.get("storage_url", "./data")
prefix = config.get("prefix", "job-impact-engine")
job_id = config.get("job_id", None)
self.job = create_job(storage_url, prefix=prefix, job_id=job_id)
self.store = self.job.get_store()
self.is_connected = True
return True
[docs]
def write_json(self, path: str, data: Dict[str, Any]) -> None:
"""Write JSON data to storage."""
if not self.is_connected:
raise ConnectionError("Storage not connected. Call connect() first.")
self.store.write_json(path, data)
[docs]
def write_csv(self, path: str, df: pd.DataFrame) -> None:
"""Write DataFrame to CSV in storage."""
if not self.is_connected:
raise ConnectionError("Storage not connected. Call connect() first.")
self.store.write_csv(path, df)
[docs]
def write_yaml(self, path: str, data: Dict[str, Any]) -> None:
"""Write YAML data to storage."""
if not self.is_connected:
raise ConnectionError("Storage not connected. Call connect() first.")
self.store.write_yaml(path, data)
[docs]
def write_parquet(self, path: str, df: pd.DataFrame) -> None:
"""Write DataFrame to Parquet in storage."""
if not self.is_connected:
raise ConnectionError("Storage not connected. Call connect() first.")
self.store.write_parquet(path, df)
[docs]
def full_path(self, path: str) -> str:
"""Get the full path/URL for a relative path."""
if not self.is_connected:
raise ConnectionError("Storage not connected. Call connect() first.")
return self.store.full_path(path)
[docs]
def validate_connection(self) -> bool:
"""Validate that the storage connection is active."""
return self.is_connected and self.store is not None
[docs]
def get_job(self) -> Any:
"""Get the underlying job object for artifact management."""
return self.job