"""Base interfaces and common classes for the storage layer."""
from abc import ABC, abstractmethod
from typing import Any, Dict
import pandas as pd
[docs]
class StorageInterface(ABC):
"""Abstract base class defining the contract for all storage implementations.
Required methods (must override):
- connect: Initialize adapter with configuration
- write_json: Write JSON data to storage
- write_csv: Write DataFrame to CSV
- write_yaml: Write YAML data to storage
- write_parquet: Write DataFrame to Parquet
- full_path: Get full path/URL for a relative path
Optional methods (have sensible defaults):
- validate_connection: Check if connection is active
"""
[docs]
@abstractmethod
def connect(self, config: Dict[str, Any]) -> bool:
"""Initialize storage with configuration.
Parameters
----------
config : dict
Dictionary containing storage configuration (e.g., storage_url, prefix).
Returns
-------
bool
True if initialization successful, False otherwise.
"""
pass
[docs]
@abstractmethod
def write_json(self, path: str, data: Dict[str, Any]) -> None:
"""Write JSON data to storage.
Parameters
----------
path : str
Relative path within the storage location.
data : dict
Dictionary to serialize as JSON.
"""
pass
[docs]
@abstractmethod
def write_csv(self, path: str, df: pd.DataFrame) -> None:
"""Write DataFrame to CSV in storage.
Parameters
----------
path : str
Relative path within the storage location.
df : pd.DataFrame
DataFrame to write.
"""
pass
[docs]
@abstractmethod
def write_yaml(self, path: str, data: Dict[str, Any]) -> None:
"""Write YAML data to storage.
Parameters
----------
path : str
Relative path within the storage location.
data : dict
Dictionary to serialize as YAML.
"""
pass
[docs]
@abstractmethod
def write_parquet(self, path: str, df: pd.DataFrame) -> None:
"""Write DataFrame to Parquet in storage.
Parameters
----------
path : str
Relative path within the storage location.
df : pd.DataFrame
DataFrame to write.
"""
pass
[docs]
@abstractmethod
def full_path(self, path: str) -> str:
"""Get the full path/URL for a relative path.
Parameters
----------
path : str
Relative path within the storage location.
Returns
-------
str
Full path or URL to the resource.
"""
pass
[docs]
def validate_connection(self) -> bool:
"""Validate that the storage connection is active and functional.
Default implementation returns True. Override for custom validation.
Returns
-------
bool
True if connection is valid, False otherwise.
"""
return True
[docs]
def get_job(self) -> Any:
"""Get the underlying job object for artifact management.
This is used for creating nested jobs or accessing job metadata.
Default implementation returns None. Override for adapters that
support job-based artifact management.
Returns
-------
Any
Job object or None if not applicable.
"""
return None