Plugin Interfaces
This document describes all plugin interface ABCs.
ComputePlugin
Section titled “ComputePlugin”class ComputePlugin(ABC): """Interface for compute engines where dbt transforms execute."""
name: str version: str is_self_hosted: bool
@abstractmethod def generate_dbt_profile(self, config: ComputeConfig) -> dict: """Generate dbt profile.yml configuration.""" pass
@abstractmethod def get_required_dbt_packages(self) -> list[str]: """Return required dbt packages.""" pass
@abstractmethod def validate_connection(self, config: ComputeConfig) -> ConnectionResult: """Test connection to compute engine.""" pass
@abstractmethod def get_resource_requirements(self, workload_size: str) -> ResourceSpec: """Return K8s resource requirements for dbt job pods.""" pass
def get_catalog_attachment_sql( self, catalog_config: CatalogConfig ) -> list[str] | None: """Return SQL to attach compute engine to Iceberg catalog.
For DuckDB: Returns ATTACH statements for Iceberg REST catalog For Spark/Snowflake: Returns None (configured differently) """ return NoneOrchestratorPlugin
Section titled “OrchestratorPlugin”class OrchestratorPlugin(ABC): """Interface for orchestration platforms (Dagster, Airflow, etc.)."""
name: str version: str floe_api_version: str
@abstractmethod def create_definitions(self, artifacts: CompiledArtifacts) -> Any: """Validate compiled artifacts and delegate to runtime definition loading.
For Dagster: Direct plugin calls validate artifacts, then require the generated definitions.py loader shim or load_product_definitions(product_name, project_dir) because usable Definitions need compiled_artifacts.json, target/manifest.json, and dbt profile/project files resolved from one product directory. For Airflow: Returns DAG object when its runtime context is available. """ pass
@abstractmethod def create_assets_from_transforms(self, transforms: list[TransformConfig]) -> list: """Create orchestrator assets from dbt transforms.
For Dagster: Returns list of @asset decorated functions For Airflow: Returns list of tasks """ pass
@abstractmethod def get_helm_values(self) -> dict[str, Any]: """Return Helm chart values for deploying orchestration services.
Returns: Dictionary matching Helm chart schema with resource requests/limits and service configuration. """ pass
@abstractmethod def validate_connection(self) -> ValidationResult: """Test connectivity to orchestration service.
Returns: ValidationResult with success status and actionable error messages. Must complete within 10 seconds. """ pass
@abstractmethod def get_resource_requirements(self, workload_size: str) -> ResourceSpec: """Return K8s ResourceRequirements for orchestration workloads.
Args: workload_size: "small", "medium", "large"
Returns: ResourceSpec with CPU/memory requests and limits. """ pass
@abstractmethod def emit_lineage_event( self, event_type: str, job: str, inputs: list[Dataset], outputs: list[Dataset] ) -> None: """Emit OpenLineage event for data lineage tracking.
Args: event_type: "START" | "COMPLETE" | "FAIL" job: Job name (e.g., "dbt_run") inputs: Input datasets outputs: Output datasets """ pass
@abstractmethod def schedule_job(self, job_name: str, cron: str, timezone: str) -> None: """Schedule a job for recurring execution.
Args: job_name: Name of the job to schedule cron: Cron expression (e.g., "0 8 * * *") timezone: IANA timezone (e.g., "America/New_York") """ passEntry points:
[project.entry-points."floe.orchestrators"]dagster = "floe_orchestrator_dagster:DagsterPlugin"airflow = "floe_orchestrator_airflow:AirflowPlugin"Requirements Traceability: REQ-021 to REQ-030 (OrchestratorPlugin Standards)
CatalogPlugin
Section titled “CatalogPlugin”class CatalogPlugin(ABC): """Interface for Iceberg catalogs."""
name: str version: str
@abstractmethod def connect(self, config: dict) -> Catalog: """Connect to catalog and return PyIceberg Catalog.""" pass
@abstractmethod def get_storage_requirements(self) -> PluginRequirements: """Return storage requirements for composition validation.""" pass
@abstractmethod def build_catalog_deployment( self, storage: StorageDeploymentBinding, ) -> CatalogDeploymentBinding: """Translate neutral storage state into catalog-owned deployment config.""" pass
@abstractmethod def create_namespace(self, namespace: str, properties: dict | None = None): """Create namespace.""" pass
@abstractmethod def vend_credentials(self, table_path: str, operations: list[str]) -> dict: """Vend short-lived credentials for table access.""" passSemanticLayerPlugin
Section titled “SemanticLayerPlugin”class SemanticLayerPlugin(ABC): """Interface for semantic/consumption layers."""
name: str version: str
@abstractmethod def sync_from_dbt_manifest(self, manifest_path: Path, output_dir: Path) -> list[Path]: """Generate semantic models from dbt manifest.""" pass
@abstractmethod def get_security_context(self, namespace: str, roles: list[str]) -> dict: """Build security context for data isolation.""" pass
@abstractmethod def get_datasource_config(self, compute_plugin: ComputePlugin) -> dict: """Generate datasource configuration from compute plugin.
The semantic layer delegates to the active compute plugin for database connectivity, following the platform's plugin architecture (ADR-0032).
Args: compute_plugin: Active ComputePlugin instance
Returns: Datasource configuration dict for the semantic layer
Example: For DuckDB compute: { "type": "duckdb", "url": "/data/floe.duckdb", "catalog": "ice" }
For Snowflake compute: { "type": "snowflake", "account": "xxx.us-east-1", "warehouse": "compute_wh", ... } """ passIngestionPlugin
Section titled “IngestionPlugin”class IngestionPlugin(ABC): """Interface for data ingestion/EL plugins."""
name: str version: str is_external: bool
@abstractmethod def create_pipeline(self, config: IngestionConfig) -> any: """Create ingestion pipeline from configuration.""" pass
@abstractmethod def run(self, pipeline: any, **kwargs) -> IngestionResult: """Execute the ingestion pipeline.""" pass
def get_composition_requirements(self) -> any: """Declare storage/catalog requirements for platform composition.""" return None
def build_deployment_binding(self, *, storage: any, catalog: any) -> any: """Build a secret-free deployment binding from composed platform state.""" raise NotImplementedErrorIngestion destination wiring is provided by CompiledArtifacts.deployment.ingestion.
Ingestion plugins consume composed runtime bindings rather than accepting raw
catalog/storage dictionaries.
DBTPlugin
Section titled “DBTPlugin”Per ADR-0043, dbt execution environment (WHERE dbt compiles) is pluggable, while dbt framework (SQL transformation DSL) is enforced:
class DBTRunResult(BaseModel): """Result of a dbt command execution.""" success: bool manifest_path: Path run_results_path: Path catalog_path: Path | None = None execution_time_seconds: float models_run: int tests_run: int failures: int metadata: dict[str, Any] = {}
class DBTPlugin(ABC): """Interface for dbt compilation environment plugins.
Responsibilities: - Compile dbt projects (Jinja -> SQL) - Execute dbt commands (run, test, snapshot) - Provide SQL linting (optional, dialect-aware)
Note: This plugins WHERE dbt executes (local/cloud/fusion), NOT the SQL transformation framework itself (enforced). """
name: str # e.g., "local", "fusion", "cloud" version: str floe_api_version: str
@abstractmethod def compile_project( self, project_dir: Path, profiles_dir: Path, target: str, ) -> Path: """Compile dbt project and return path to manifest.json.
Returns: Path to compiled manifest.json (typically target/manifest.json)
Raises: CompilationError: If dbt compilation fails """ pass
@abstractmethod def run_models( self, project_dir: Path, profiles_dir: Path, target: str, select: str | None = None, exclude: str | None = None, full_refresh: bool = False, ) -> DBTRunResult: """Execute dbt models.
Returns: DBTRunResult with success status and executed model count """ pass
@abstractmethod def test_models( self, project_dir: Path, profiles_dir: Path, target: str, select: str | None = None, ) -> DBTRunResult: """Execute dbt tests.
Returns: DBTRunResult with pass/fail status and test results """ pass
@abstractmethod def lint_project( self, project_dir: Path, profiles_dir: Path, target: str, fix: bool = False, ) -> LintResult: """Lint SQL files with dialect-aware validation.
Args: fix: If True, auto-fix issues (if linter supports it)
Returns: LintResult with all detected linting issues
Raises: DBTLintError: If linting process fails (not if SQL has issues) """ pass
@abstractmethod def get_manifest(self, project_dir: Path) -> dict[str, Any]: """Retrieve dbt manifest.json (filesystem or API).""" pass
@abstractmethod def get_run_results(self, project_dir: Path) -> dict[str, Any]: """Retrieve dbt run_results.json.""" pass
@abstractmethod def supports_parallel_execution(self) -> bool: """Indicate whether runtime supports parallel execution.""" pass
@abstractmethod def supports_sql_linting(self) -> bool: """Indicate whether this compilation environment provides SQL linting.""" pass
@abstractmethod def get_runtime_metadata(self) -> dict[str, Any]: """Return runtime-specific metadata for observability.""" passEntry points:
[project.entry-points."floe.dbt"]local = "floe_dbt_local:LocalDBTPlugin"fusion = "floe_dbt_fusion:FusionDBTPlugin"cloud = "floe_dbt_cloud:CloudDBTPlugin"Implementation Priority:
- LocalDBTPlugin (Epic 3): dbt-core via dbtRunner, SQLFluff linting
- FusionDBTPlugin (Epic 3): dbt Fusion CLI, built-in static analysis
- CloudDBTPlugin (Epic 8+): dbt Cloud API (deferred)
Requirements Traceability: REQ-086 to REQ-095 (DBT Runtime Plugin), REQ-096 to REQ-100 (SQL Linting)
See: interfaces/dbt-plugin.md for canonical interface definition, ADR-0043 for architecture rationale.
DataQualityPlugin
Section titled “DataQualityPlugin”Per ADR-0044, data quality frameworks (Great Expectations, Soda, custom) are pluggable through a unified interface. The DataQualityPlugin handles both compile-time validation (config syntax, quality gates) and runtime execution (live data checks, quality scoring).
class DataQualityPlugin(ABC): """Unified interface for data quality frameworks.
Responsibilities: - Validate quality check configuration at compile-time (no data access) - Enforce quality gate thresholds at compile-time - Execute quality checks against live data at runtime - Calculate quality scores with configurable weights - Provide OpenLineage integration for quality events """
# Plugin metadata name: str version: str floe_api_version: str
# COMPILE-TIME METHODS (No data access) @abstractmethod def validate_config( self, config_path: Path, dbt_manifest: dict[str, Any] ) -> ValidationResult: """Validate quality check configuration syntax.""" pass
@abstractmethod def validate_quality_gates( self, manifest: dict[str, Any], required_coverage: dict[str, float] ) -> ValidationResult: """Enforce quality gate thresholds at compile-time.""" pass
# RUNTIME METHODS (Require data access) @abstractmethod def execute_checks( self, connection: DatabaseConnection, expectations: list[QualityExpectation] ) -> QualityCheckResult: """Execute quality checks against live data.""" pass
@abstractmethod def calculate_quality_score( self, check_results: list[QualityCheckResult], weights: dict[str, float] ) -> float: """Calculate overall quality score (0-100) with weighted formula.""" pass
# INTEGRATION METHODS @abstractmethod def get_lineage_emitter(self) -> LineageEmitter: """Get OpenLineage emitter for this DQ tool.""" pass
@abstractmethod def supports_sql_dialect(self, dialect: str) -> bool: """Check if DQ tool supports given SQL dialect.""" passEntry points:
[project.entry-points."floe.data_quality"]great_expectations = "floe_dq_great_expectations:GreatExpectationsPlugin"soda = "floe_dq_soda:SodaPlugin"dbt_expectations = "floe_dq_dbt:DBTExpectationsPlugin"custom = "floe_dq_custom:CustomPlugin"Platform Configuration:
plugins: data_quality: provider: great_expectations # Single choice config: quality_gates: bronze: {min_test_coverage: 50%} silver: {min_test_coverage: 80%, required_tests: [not_null, unique]} gold: {min_test_coverage: 100%, required_tests: [not_null, unique, relationships]} weights: critical_checks: 3.0 standard_checks: 1.0 statistical_checks: 0.5Usage Timeline:
- Compile-time: Compiler calls
validate_config()andvalidate_quality_gates() - Runtime: ContractMonitor calls
execute_checks()every 6 hours - Scoring:
calculate_quality_score()combines dbt tests + DQ checks with weighted formula
Implementation Priority:
- GreatExpectationsPlugin (Epic 7): GX Python API wrapper
- SodaPlugin (Epic 8+): Soda Core integration
- DBTExpectationsPlugin (Epic 8+): Wraps dbt native tests for unified scoring
See ADR-0044 for complete specification, quality gate requirements (REQ-241-244), and Great Expectations integration (REQ-207, REQ-248).
TelemetryBackendPlugin
Section titled “TelemetryBackendPlugin”Per ADR-0035, telemetry backends (Jaeger, Datadog, Grafana Cloud) are pluggable for OTLP traces, metrics, and logs. The TelemetryBackendPlugin wraps the three-layer architecture:
- Layer 1 (Enforced): OpenTelemetry SDK emission
- Layer 2 (Enforced): OTLP Collector aggregation
- Layer 3 (Pluggable): Backend storage/visualization
class TelemetryBackendPlugin(ABC): """Interface for OTLP telemetry backend plugins.
Responsibilities: - Configure OTLP Collector exporter for backend-specific protocol - Provide Helm values for deploying backend services (if self-hosted) - Validate connection to backend """
name: str # e.g., "jaeger", "datadog", "grafana-cloud" version: str floe_api_version: str
@abstractmethod def get_otlp_exporter_config(self) -> dict[str, Any]: """Generate OTLP Collector exporter configuration.
Returns: Dictionary matching OTLP Collector config schema. Must include 'exporters' section with backend-specific config.
Example (Jaeger): { "exporters": { "jaeger": { "endpoint": "jaeger:14250", "tls": {"insecure": true} } }, "service": { "pipelines": { "traces": { "receivers": ["otlp"], "processors": ["batch"], "exporters": ["jaeger"] } } } } """ pass
@abstractmethod def get_helm_values(self) -> dict[str, Any]: """Generate Helm values for deploying backend services.
Returns: Helm values dictionary for backend chart. Empty dict if backend is external (SaaS). """ pass
@abstractmethod def validate_connection(self) -> bool: """Validate connection to backend.
Returns: True if connection successful, False otherwise. """ passEntry points:
[project.entry-points."floe.telemetry_backends"]jaeger = "floe_telemetry_jaeger:JaegerPlugin"datadog = "floe_telemetry_datadog:DatadogPlugin"grafana-cloud = "floe_telemetry_grafana:GrafanaCloudPlugin"See ADR-0035 for complete specification and reference implementations.
LineageBackendPlugin
Section titled “LineageBackendPlugin”Per ADR-0035, lineage backends (Marquez, Atlan, OpenMetadata) are pluggable for OpenLineage events. The LineageBackendPlugin is architecturally independent from TelemetryBackendPlugin (uses direct HTTP transport, not OTLP Collector).
class LineageBackendPlugin(ABC): """Interface for OpenLineage backend plugins.
Responsibilities: - Configure OpenLineage HTTP transport for backend-specific endpoint - Define namespace strategy for lineage events - Provide Helm values for deploying backend services (if self-hosted) - Validate connection to backend """
name: str # e.g., "marquez", "atlan", "openmetadata" version: str floe_api_version: str
@abstractmethod def get_transport_config(self) -> dict[str, Any]: """Generate OpenLineage HTTP transport configuration.
Returns: Dictionary with 'type' (must be 'http') and endpoint config.
Example (Marquez): { "type": "http", "url": "http://marquez:5000/api/v1/lineage", "timeout": 5.0, "endpoint": "api/v1/lineage" } """ pass
@abstractmethod def get_namespace_strategy(self) -> dict[str, Any]: """Define namespace strategy for lineage events.
Returns: Dictionary with namespace strategy configuration.
Example (environment-based): { "strategy": "environment_based", "template": "floe-{environment}", "environment_var": "FLOE_ENVIRONMENT" } """ pass
@abstractmethod def get_helm_values(self) -> dict[str, Any]: """Generate Helm values for deploying backend services.
Returns: Helm values dictionary for backend chart. Empty dict if backend is external (SaaS). """ pass
@abstractmethod def validate_connection(self) -> bool: """Validate connection to backend.
Returns: True if connection successful, False otherwise. """ passEntry points:
[project.entry-points."floe.lineage_backends"]marquez = "floe_lineage_marquez:MarquezPlugin"atlan = "floe_lineage_atlan:AtlanPlugin"openmetadata = "floe_lineage_openmetadata:OpenMetadataPlugin"See ADR-0035 for complete specification, split architecture rationale, and reference implementations (JaegerPlugin, MarquezPlugin, DatadogPlugin, AtlanPlugin).
StoragePlugin
Section titled “StoragePlugin”Per ADR-0036, storage backends (S3, GCS, Azure, MinIO) are pluggable through a neutral storage deployment binding backed by the PyIceberg FileIO pattern. This is the target semantic interface; the live migration-era ABC may still require legacy helper methods until plugin uplift removes that compatibility surface.
class StoragePlugin(ABC): """Interface for storage backend plugins.
Emits secret-free storage desired state: - protocol and endpoint roles - warehouse and bucket requirements - credential references - capabilities and provisioning intent - runtime FileIO facts """
name: str # e.g., "minio", "gcs", "azure-blob" version: str floe_api_version: str
@abstractmethod def get_deployment_binding(self) -> StorageDeploymentBinding: """Return neutral storage desired state for composition and rendering.""" pass
@abstractmethod def get_pyiceberg_fileio(self) -> FileIO: """Create PyIceberg FileIO instance for direct runtime use.""" passfloe-core validates storage/catalog compatibility and carries the typed
deployment bindings in CompiledArtifacts. Catalog, compute, orchestrator, and
deployment plugins translate the neutral storage binding into their own config.
Storage plugins must not emit catalog-specific bootstrap JSON or Helm values as
the semantic contract, and renderers must not reconstruct storage facts from
legacy chart values.
Entry points:
[project.entry-points."floe.storage"]minio = "floe_storage_minio.plugin:MinIOStoragePlugin"gcs = "floe_storage_gcs:GCSPlugin"See ADR-0036 for complete specification and PyIceberg FileIO integration examples.