Skip to content

ADR-0036: Storage Plugin Interface

Accepted

floe uses Apache Iceberg as the enforced table format (ADR-0005). Iceberg tables require object storage (S3, GCS, Azure Blob, etc.) for data files and metadata.

The Problem: Hardcoded Storage Configuration

Section titled “The Problem: Hardcoded Storage Configuration”

Current implementations assume specific storage backends:

# Hardcoded S3 assumption
catalog = load_catalog(
"polaris",
warehouse="s3://my-bucket/warehouse" # S3-specific
)

Issues with this approach:

  1. Cloud provider lock-in: S3 syntax doesn’t work for GCS (gs://) or Azure (abfss://)
  2. Credential management: Different auth for AWS (IAM), GCS (service account), Azure (SAS)
  3. Testing complexity: Cannot easily exercise local evaluation and future provider-native object-storage backends through the same contract
  4. Enterprise requirements: Some organizations require specific storage (on-prem, multi-cloud)

Organizations Have Different Storage Needs

Section titled “Organizations Have Different Storage Needs”
Organization TypeStorage BackendRationale
AWS-firstS3Native AWS integration, lowest latency
GCP-firstGCSNative GCP integration, existing buckets
Azure-firstAzure BlobNative Azure integration, compliance
Local developmentMinIOS3-compatible, runs in Kind cluster
Multi-cloudMultiple (S3 + GCS)Disaster recovery, vendor diversification
On-premisesS3-compatible (NetApp, Dell)Data sovereignty, compliance

Requirement: Storage backends must be pluggable, not hardcoded paths.

PyIceberg (the Python library for Iceberg) uses the FileIO pattern for storage abstraction:

from pyiceberg.io import FileIO
class FileIO(ABC):
"""Abstract base class for file I/O operations."""
@abstractmethod
def new_input(self, path: str) -> InputFile:
"""Create input file for reading."""
pass
@abstractmethod
def new_output(self, path: str) -> OutputFile:
"""Create output file for writing."""
pass

Implementations:

  • S3FileIO - AWS S3 (boto3)
  • GCSFileIO - Google Cloud Storage (gcs-python)
  • AzureFileIO - Azure Blob Storage (azure-storage-blob)
  • LocalFileIO - Local filesystem (testing)

This is the pattern we should follow.

Create a StoragePlugin interface that emits a neutral, secret-free storage deployment binding for pluggable object storage backends and still supports direct PyIceberg FileIO use where runtime code needs it.

The target interface is now a neutral storage deployment binding plus composition resolver, not storage-owned projections for every consumer.

The original interface correctly identified the storage plugin boundary, but methods such as get_dbt_profile_config(), get_dagster_io_manager_config(), and get_helm_values_override() encourage storage plugins to know too much about compute, orchestration, catalog, and deployment renderers. That creates an N x M coupling problem as new catalogs and storage backends are added.

The revised rule is:

StoragePlugin emits neutral StorageDeploymentBinding.
CatalogPlugin declares storage requirements and translates storage into CatalogDeploymentBinding.
Compute and orchestrator plugins consume runtime storage facts.
floe-core validates compatibility through the composition resolver.
Helm renders resolved deployment bindings.

Legacy helper methods can remain during migration, but they are not the semantic contract.

from abc import ABC, abstractmethod
from pyiceberg.io import FileIO
from floe_core.schemas.compiled_artifacts import StorageDeploymentBinding
class StoragePlugin(ABC):
"""Plugin interface for object storage backends.
Storage plugins own provider facts only:
- protocol and endpoint roles
- warehouse and bucket requirements
- credential references
- capabilities and provisioning intent
- neutral runtime fragments for consumers to translate
"""
name: str # e.g., "minio", "gcs", "azure"
version: str # Plugin version (semver)
floe_api_version: str # Supported floe-core API version
@abstractmethod
def get_deployment_binding(self) -> StorageDeploymentBinding:
"""Return secret-free storage desired state for compilation."""
pass
@abstractmethod
def get_pyiceberg_fileio(self) -> FileIO:
"""Create PyIceberg FileIO instance for direct runtime use."""
pass

This is the target semantic interface. During migration, the live ABC may still include legacy abstract helpers for dbt, Dagster, warehouse URI, and Helm fragments so existing plugins instantiate cleanly. Those helpers are compatibility surface only; new integration logic should use get_deployment_binding() and consumer-owned translators.

Provider-specific consumer projections are owned by the consumer. Polaris, for example, translates MinIO’s neutral binding into storageConfigInfo, endpoint fields, path-style access, STS semantics, allowed locations, and Kubernetes Secret references. Helm renders that resolved binding; it does not reconstruct semantic storage facts from raw chart credentials or legacy plugin config.

# pyproject.toml for the implemented floe-storage-minio plugin
[project.entry-points."floe.storage"]
minio = "floe_storage_minio.plugin:MinIOStoragePlugin"
manifest.yaml
plugins:
storage: minio # Plugin name (discovered via entry points)
# Compiler discovers plugin and invokes:
# 1. get_deployment_binding() -> neutral storage desired state
# 2. composition resolver -> storage/catalog compatibility validation
# 3. catalog translator -> catalog-owned deployment binding
# 4. Helm/runtime renderers -> generated config from typed bindings
  • Composability - Storage backends are plugins, not hardcoded paths (ADR-0037)
  • PyIceberg alignment - Follows industry-standard FileIO pattern
  • Portability path - The alpha lane validates MinIO through the S3-compatible protocol while leaving room for provider-native plugins
  • Testing efficiency - MinIO exercises S3-compatible Iceberg behavior in local and remote Kubernetes validation lanes
  • Credential security - Centralized credential management per backend
  • Multi-cloud support - Future: Multiple storage plugins per platform
  • Abstraction overhead - More files/classes than hardcoded URIs
  • Plugin development - Requires implementing ABC for each backend
  • FileIO complexity - PyIceberg FileIO API has learning curve
  • Initial setup - Must install plugin package (e.g., pip install floe-storage-minio)
  • Implemented storage plugin - floe-storage-minio supports MinIO using S3-compatible protocol settings
  • Provider-native plugins - GCS or Azure plugins are future/provider-specific extensions, not alpha-shipped packages
  • Migration path - Existing hardcoded URIs can coexist during transition

Reference Implementation Excerpt: MinIOStoragePlugin

Section titled “Reference Implementation Excerpt: MinIOStoragePlugin”

The production implementation in plugins/floe-storage-minio is the source of truth. The conceptual excerpt below shows the public plugin shape: MinIO emits S3-compatible provider facts and credential references, not Polaris bootstrap JSON or raw credential values. See the live plugin for exact imports and method body details.

# Conceptual excerpt; see plugins/floe-storage-minio/src/floe_storage_minio/plugin.py
from typing import Any
from pydantic import BaseModel
from floe_core.plugin_errors import PluginConfigurationError
from floe_core.plugins import StoragePlugin
from floe_storage_minio.config import MinIOStorageConfig
class MinIOStoragePlugin(StoragePlugin):
"""Storage plugin for MinIO."""
def __init__(self, config: MinIOStorageConfig | None = None) -> None:
"""Initialize MinIO storage plugin with validated config."""
super().__init__()
self._config = config
@property
def name(self) -> str:
return "minio"
@property
def version(self) -> str:
return "0.1.0"
@property
def floe_api_version(self) -> str:
return "1.0"
@property
def description(self) -> str:
return "MinIO object storage plugin for Iceberg data"
@property
def tracer_name(self) -> str:
return "floe.storage.minio"
def get_config_schema(self) -> type[BaseModel]:
return MinIOStorageConfig
def _get_pyiceberg_s3_properties(self) -> dict[str, str]:
config = self._require_config()
return {
"s3.endpoint": config.endpoint,
"s3.region": config.region,
"s3.path-style-access": str(config.path_style_access).lower(),
}
def _require_config(self) -> MinIOStorageConfig:
if self._config is None:
raise PluginConfigurationError(
"minio",
[{"field": "_config", "message": "Plugin 'minio' not configured"}],
)
return self._config
def get_pyiceberg_fileio(self):
"""Create PyIceberg FileIO for MinIO using S3-compatible keys."""
from pyiceberg.io.fsspec import FsspecFileIO
return FsspecFileIO(properties=self._get_pyiceberg_s3_properties())
def get_pyiceberg_catalog_config(self) -> dict[str, Any]:
"""Return PyIceberg catalog config for S3-backed table loading."""
return self._get_pyiceberg_s3_properties()
def get_deployment_binding(self) -> StorageDeploymentBinding:
"""Return secret-free MinIO deployment binding for compiled artifacts."""
config = self._require_config()
return StorageDeploymentBinding(
provider="minio",
protocol="s3-compatible",
endpoint=StorageServiceEndpoint(
internal_url=config.endpoint,
external_url=config.external_endpoint or config.endpoint,
region=config.region,
warehouse_path=f"s3://{config.bucket}",
path_style_access=config.path_style_access,
),
warehouse=StorageWarehouse(uri=f"s3://{config.bucket}", bucket=config.bucket),
allowed_locations=[f"s3://{config.bucket}"],
buckets=[
StorageBucketRequirement(
name=config.bucket,
uri=f"s3://{config.bucket}",
purpose="warehouse",
create_policy="create-if-missing",
)
],
credentials=StorageCredentialBinding(
mode="kubernetes-secret",
secret_ref=KubernetesSecretRef(
name=config.credential_secret_name,
namespace=config.credential_secret_namespace,
keys={
"accessKeyId": config.access_key_secret_key,
"secretAccessKey": config.secret_key_secret_key,
},
),
),
capabilities=StorageCapabilities(
protocols=["s3-compatible"],
credential_modes=["kubernetes-secret"],
sts_supported=False,
path_style_access=config.path_style_access,
),
provisioning=StorageProvisioningIntent(
enabled=True,
mode="helm-job",
default_create_policy="create-if-missing",
),
runtime=StorageRuntimeBinding(
pyiceberg_properties={
"s3.endpoint": config.endpoint,
"s3.region": config.region,
"s3.path-style-access": str(config.path_style_access).lower(),
},
env_refs={
"accessKeyId": "AWS_ACCESS_KEY_ID",
"secretAccessKey": "AWS_SECRET_ACCESS_KEY", # pragma: allowlist secret
},
),
dbt=DbtStorageBinding(
profile_name="floe",
target_name="dev",
schema_name="analytics",
),
dagster=DagsterStorageBinding(
resource_key="minio_storage",
asset_io_manager_key="iceberg_io_manager",
),
)

The dbt and dagster fields are currently required schema fields for secret-free runtime identity hints. They are not permission for storage plugins to own dbt profile generation or Dagster resource construction; those consumers translate the compiled binding.

MinIO uses the implemented MinIOStoragePlugin with a MinIO endpoint and path-style access. The plugin keeps S3-compatible protocol settings for PyIceberg and runtime integrations.

plugins:
storage:
type: minio
config:
endpoint: http://floe-platform-minio:9000
bucket: floe-data
region: us-east-1
path_style_access: true
floe-storage-gcs/src/floe_storage_gcs/plugin.py
from __future__ import annotations
import os
from pyiceberg.io.pyarrow import PyArrowFileIO
from floe_core.plugins import StoragePlugin
from floe_core.schemas.compiled_artifacts import StorageDeploymentBinding
class GCSPlugin(StoragePlugin):
"""Storage plugin for Google Cloud Storage."""
name = "gcs"
version = "0.1.0"
floe_api_version = "2.0.0"
def __init__(self, bucket: str = "floe-warehouse", project: str | None = None):
"""Initialize GCS plugin.
Args:
bucket: GCS bucket name
project: GCP project ID (optional, uses default if not set)
"""
self.bucket = bucket
self.project = project or os.getenv("GCP_PROJECT", "")
def get_pyiceberg_fileio(self) -> PyArrowFileIO:
"""Create PyIceberg FileIO for GCS."""
return PyArrowFileIO(
{
"gcs.project-id": self.project,
"gcs.oauth2.token-provider-type": "service-account",
"gcs.oauth2.service-account-file": os.getenv(
"GOOGLE_APPLICATION_CREDENTIALS", ""
),
}
)
def get_deployment_binding(self) -> StorageDeploymentBinding:
"""Return secret-free GCS desired state.
A real implementation would declare GCS protocol/capability facts and
workload identity or Secret references. Catalog plugins would translate
those facts into catalog-specific deployment config.
"""
...

Decision Criteria: When to Create Plugin vs Configuration

Section titled “Decision Criteria: When to Create Plugin vs Configuration”

Per ADR-0037 (Composability Principle):

ScenarioDecisionRationale
Multiple storage backends existPluginS3, GCS, Azure, MinIO all valid
Organization may swap storagePluginStart with MinIO for local evaluation, then add and validate provider-native plugins as needed
Storage requires different credentialsPluginAWS IAM ≠ GCS service account ≠ Azure SAS
Storage-specific featuresPluginS3 Transfer Acceleration, GCS lifecycle policies

Not configuration because:

  • Storage URIs differ (s3:// vs gs:// vs abfss://)
  • Credential mechanisms differ (IAM vs service account vs SAS)
  • PyIceberg FileIO implementations differ (S3FileIO vs GCSFileIO)
# dbt profiles.yml (dbt/compute integration derives filesystem config
# from the compiled storage binding)
floe:
target: dev
outputs:
dev:
type: duckdb
path: /tmp/floe.duckdb
plugins:
- module: dbt_duckdb_polaris
config:
catalog:
uri: http://polaris:8181
filesystems: # Derived from the compiled storage binding
s3:
key_id: ${AWS_ACCESS_KEY_ID}
secret: ${AWS_SECRET_ACCESS_KEY}
region: us-east-1
# Dagster definitions consume the compiled storage binding.
from dagster import Definitions
from dagster_iceberg import IcebergIOManager
defs = Definitions(
assets=assets,
resources={
"io_manager": IcebergIOManager(
catalog_uri="http://polaris:8181",
warehouse="s3://floe-warehouse/bronze",
storage_options=artifacts.deployment.storage.runtime.pyiceberg_properties,
)
},
)
# Catalog loading consumes catalog-owned deployment/runtime config.
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
"polaris",
type="rest",
uri="http://polaris:8181",
warehouse=artifacts.deployment.catalog.polaris.default_base_location,
**artifacts.deployment.storage.runtime.pyiceberg_properties,
)
tests/unit/test_storage_plugin.py
from unittest.mock import Mock
from floe_core.plugins import StoragePlugin
from floe_core.schemas.compiled_artifacts import StorageDeploymentBinding
def test_compiler_with_mock_storage():
"""Test compiler with mocked storage plugin."""
mock_plugin = Mock(spec=StoragePlugin)
mock_plugin.get_deployment_binding.return_value = StorageDeploymentBinding(...)
compiler = Compiler(storage_plugin=mock_plugin)
artifacts = compiler.compile(spec)
assert artifacts.deployment.storage is not None
mock_plugin.get_deployment_binding.assert_called_once()
tests/integration/test_minio_plugin.py
from floe_storage_minio import MinIOStoragePlugin
from floe_storage_minio.config import MinIOStorageConfig
def test_minio_plugin_generates_valid_fileio():
"""Test MinIOStoragePlugin generates valid PyIceberg FileIO."""
plugin = MinIOStoragePlugin(
config=MinIOStorageConfig(endpoint="http://minio:9000", bucket="warehouse")
)
fileio = plugin.get_pyiceberg_fileio()
assert fileio is not None
assert fileio.properties["s3.endpoint"] == "http://minio:9000"
assert fileio.properties["s3.path-style-access"] == "true"
# ❌ ANTI-PATTERN: Hardcoded S3, won't work for GCS
warehouse = "s3://my-bucket/warehouse"
# ❌ ANTI-PATTERN: Coupled to core
def get_warehouse_uri(storage_type: str) -> str:
if storage_type == "minio":
return "s3://bucket/warehouse"
elif storage_type == "gcs":
return "gs://bucket/warehouse"
# Every new backend requires core changes

DO: Use plugin interface with PyIceberg FileIO

Section titled “DO: Use plugin interface with PyIceberg FileIO”
# ✅ CORRECT: Composable, extensible
storage_plugin = registry.discover("floe.storage")["minio"]
storage = storage_plugin.get_deployment_binding()
catalog = catalog_plugin.build_catalog_deployment(storage)
deployment = DeploymentConfig(storage=storage, catalog=catalog)
  • AWS: Use IAM roles (preferred) or access keys (K8s Secrets)
  • GCP: Use Workload Identity (preferred) or service account JSON (K8s Secrets)
  • Azure: Use Managed Identity (preferred) or SAS tokens (K8s Secrets)
  • MinIO: Use K8s Secret references in compiled artifacts and chart values (NOT raw credential values)
  • Bucket policies: Restrict access to specific prefixes (e.g., /warehouse/bronze/*)
  • Network policies: K8s NetworkPolicy restricts storage access to authorized pods
  • Encryption: Use server-side encryption (SSE-S3, SSE-KMS, etc.)
  • S3: Enable CloudTrail for access logging
  • GCS: Enable Cloud Audit Logs
  • Azure: Enable Storage Analytics logging

Relationship with Table Operations (IcebergTableManager)

Section titled “Relationship with Table Operations (IcebergTableManager)”

Important clarification: StoragePlugin handles neutral storage desired state and direct FileIO access, NOT Iceberg table operations or catalog-specific deployment JSON.

Table operations (create_table, evolve_schema, write_data, manage_snapshots) are handled by IcebergTableManager, an internal utility class in floe-iceberg package (Epic 4D).

┌─────────────────────┐ ┌─────────────────────┐
│ StoragePlugin │ │ CatalogPlugin │
│ (neutral binding) │ │ (catalog binding) │
└──────────┬──────────┘ └──────────┬──────────┘
│ │
└───────────┬───────────────┘
┌─────────────────────┐
│ IcebergTableManager│
│ (internal utility) │
│ - create_table() │
│ - evolve_schema() │
│ - write_data() │
│ - manage_snapshots()│
└─────────────────────┘

Why IcebergTableManager is NOT a plugin:

  • Iceberg is enforced (ADR-0005), not pluggable
  • Table operations are Iceberg-specific, no need for abstraction
  • CatalogPlugin already returns PyIceberg Catalog for table registration
  • StoragePlugin already provides storage bindings and FileIO for data access

See the historical Epic 4D storage-plugin specification for original background; the live contract in packages/floe-core/src/floe_core/plugins/storage.py and this ADR are the current truth.

Q: Can we use multiple storage backends per platform (multi-cloud)?

Section titled “Q: Can we use multiple storage backends per platform (multi-cloud)?”

A: Not in initial implementation. One plugin per platform. Future: Support multiple plugins with namespace-to-storage mapping.

Q: How do we handle storage-specific features (S3 Transfer Acceleration)?

Section titled “Q: How do we handle storage-specific features (S3 Transfer Acceleration)?”

A: Backend-specific features belong in capabilities or provider-owned binding fields when they affect composition. Extra provider methods can exist for direct advanced use, but they are not the cross-plugin contract.

Q: What about custom S3-compatible storage (NetApp, Dell EMC)?

Section titled “Q: What about custom S3-compatible storage (NetApp, Dell EMC)?”

A: Implement StoragePlugin interface with S3-compatible FileIO. Register via entry points. No core changes needed.