Skip to content

Data Contracts Architecture

This document describes the data contract architecture in floe, enabling computational governance through formal agreements between data producers and consumers.

Data contracts provide a mechanism for:

  • Schema agreements: Formal declaration of column names, types, and constraints
  • SLA definitions: Freshness, availability, and quality guarantees
  • Runtime enforcement: Monitoring and alerting when contracts are violated
  • Lifecycle management: Independent versioning and deprecation workflows
COMPILE TIME RUNTIME
────────────── ───────
│ │
floe.yaml ─────────────►│ │
+ │ │
datacontract.yaml (optional) ──►│ PolicyEnforcer │
│ (compile-time) │
│ │ │
│ ▼ │
│ CompiledArtifacts ──────────────┼──► ContractMonitor
│ (includes contracts) │ (runtime service)
│ │ │
│ │ DataContractPlugin
│ │ (wraps datacontract-cli)
│ │ │
│ │ OrchestratorPlugin
│ │ (emits OpenLineage)

Floe uses a hybrid contract model:

ModelDescription
Auto-generatedContracts derived from floe.yaml output ports
ExplicitContracts defined in datacontract.yaml (ODCS format)
MergedExplicit overrides auto-generated (default)
data-product/
├── floe.yaml # Required - defines ports, transforms
├── datacontract.yaml # Optional - explicit ODCS contract
├── models/
│ └── *.sql
└── tests/
1. Parse floe.yaml
└── Extract output_ports, input_ports, metadata
2. Generate base contract
└── DataContractPlugin.generate_contract_from_ports()
3. Check for explicit datacontract.yaml
├── If exists: Parse and merge (explicit overrides generated)
└── If not: Use generated contract as-is
4. Validate merged contract
└── PolicyEnforcer.validate_data_contracts()
5. Store in CompiledArtifacts
└── CompiledArtifacts.data_contracts[]
6. Register contract in catalog
└── CatalogPlugin.register_contract()
7. At runtime: Register with ContractMonitor
└── ContractMonitor.register_contract()

Contracts are uniquely identified by their fully-qualified ID:

Contract ID: {domain}.{product}/{contract}:{version}
└──────────────────┬─────────────────────┘
"sales.customer_360/customers:1.0.0"
ComponentDescriptionExample
domainParent domainsales
productData product namecustomer_360
contractContract namecustomers
versionSemantic version1.0.0

The contract inherits the product’s namespace: sales.customer_360

Contracts are registered in the Iceberg catalog alongside their parent product:

Catalog: floe-data
├── sales.customer_360 (product namespace)
│ ├── Properties:
│ │ ├── floe.product.name = "customer_360"
│ │ ├── floe.product.repo = "github.com/acme/sales-customer-360"
│ │ └── floe.contracts = '["customers:1.0.0", "orders:2.1.0"]'
│ │
│ └── Tables:
│ └── gold.customers

Contracts are registered via CatalogPlugin.register_contract():

# During the planned floe compile lifecycle
catalog.register_contract(
namespace="sales.customer_360",
contract_name="customers",
contract_version="1.0.0",
schema_hash="sha256:abc123...",
metadata=ContractMetadata(
owner="sales-analytics@acme.com",
description="Customer master data contract",
registered_at=datetime.utcnow(),
),
)

Once a contract version is registered, it is immutable:

  • Schema hash prevents silent changes
  • Version bump required for any modification
  • Breaking changes require MAJOR version bump

See ADR-0030: Namespace-Based Identity for the full identity model.

Floe adopts the Open Data Contract Standard (ODCS) v3.x:

  • Linux Foundation project (via Bitol)
  • Comprehensive: schema, SLAs, quality, governance
  • Tooling: datacontract-cli for validation, drift detection
  • Extensible: custom properties via additionalProperties

See ADR-0027: ODCS Standard Adoption for rationale.

apiVersion: v3.0.2
kind: DataContract
name: sales-customer-360-customers
version: 2.1.0
owner: sales-analytics@acme.com
domain: sales
description: |
Consolidated customer view combining CRM, transactions, and support data.
models:
customers:
description: Customer master data
elements:
customer_id:
type: string
required: true
primaryKey: true
description: Unique customer identifier
email:
type: string
required: true
format: email
classification: pii
unique: true
lifetime_value:
type: decimal
required: false
slaProperties:
freshness:
value: "PT6H" # ISO 8601 duration - 6 hours
element: updated_at
availability:
value: "99.9%"
terms:
usage: "Internal analytics only"
retention: "7 years per compliance"
tags:
- customer-data
- gold-layer

At compile time, the PolicyEnforcer validates:

CheckDescription
Schema validityContract schema is valid ODCS
InheritanceChild contracts don’t weaken parent
Version bumpSemantic versioning rules followed
ClassificationPII fields properly marked

At runtime, the ContractMonitor performs:

CheckIntervalDescription
Freshness15 minData updated within SLA window
Schema drift1 hourActual schema matches contract
Quality6 hoursData quality above threshold
Availability5 minData source accessible

Violations are emitted as alerts but do not block processing:

manifest.yaml
data_contracts:
enforcement: alert_only # off | warn | alert_only | block

Violations are emitted via:

  • OpenLineage FAIL events with contractViolation facet
  • Prometheus metrics (floe_contract_violations_total)
  • Logs for debugging
class DataContractPlugin(ABC):
"""Interface for data contract validation and monitoring."""
# Contract Parsing
def parse_contract(self, contract_path: Path) -> DataContract
def generate_contract_from_ports(self, output_ports, input_ports, metadata) -> DataContract
def merge_contracts(self, generated, explicit) -> DataContract
# Schema Validation
def validate_contract(self, contract, actual_schema=None) -> ContractValidationResult
def lint_contract_file(self, contract_path) -> ContractValidationResult
# Schema Drift Detection
def detect_schema_drift(self, contract, connection) -> SchemaComparisonResult
def compare_schemas(self, old_schema, new_schema) -> SchemaComparisonResult
# SLA Monitoring
def check_freshness(self, contract, connection, timestamp_column) -> SLACheckResult
def check_availability(self, contract, connection) -> SLACheckResult
def check_quality(self, contract, connection) -> list[SLACheckResult]
# Contract Lifecycle
def validate_version_bump(self, old_contract, new_contract) -> tuple[bool, str]
def check_breaking_changes(self, contract, connection) -> list[ContractViolation]

The default implementation wraps datacontract-cli:

class ODCSDataContractPlugin(DataContractPlugin):
"""ODCS implementation using datacontract-cli."""
name = "odcs"
version = "1.0.0"
def __init__(self):
from datacontract.datacontract import DataContract as DCCli
self._cli = DCCli

Contracts use independent semantic versioning:

Change TypeVersion Bump
Remove columnMAJOR
Change column typeMAJOR
Add required columnMAJOR
Relax SLA (degradation)MAJOR
Add optional columnMINOR
Stricter SLA (improvement)MINOR
Documentation changesPATCH
ACTIVE ────► DEPRECATED ────► SUNSET ────► RETIRED
(30 days) (7 days)
Normal Warnings Errors for Contract
usage emitted new consumers removed
# In datacontract.yaml
status: deprecated
deprecation:
announced: "2026-01-03"
sunset_date: "2026-02-03"
replacement: sales-customers-v4
migration_guide: https://wiki.acme.com/migrate-v4

Contracts follow the three-tier enforcement model:

Enterprise Contract (base policies)
├── All data must have owner
├── PII requires classification
├── Minimum freshness: 24h
Domain Contract (domain-specific)
├── Sales domain freshness: 6h
├── Required fields for domain
Data Product Contract (implementation)
├── Specific schema
└── Cannot relax parent requirements

Inheritance Rules:

  • Child contracts inherit parent requirements
  • Child contracts can STRENGTHEN but not WEAKEN
  • Violations detected at compile time
manifest.yaml
data_contracts:
enforcement: alert_only # off | warn | alert_only | block
standard: odcs_v3
plugin:
type: odcs # datacontract-cli wrapper
auto_generation:
enabled: true
from_ports: true # Generate from input/output ports
from_dbt_manifest: true # Enrich from dbt manifest
monitoring:
enabled: true
mode: scheduled # scheduled | continuous | on_demand
freshness:
check_interval: 15m
schema_drift:
check_interval: 1h
quality:
check_interval: 6h
alerting:
openlineage_events: true
prometheus_metrics: true

Contract validation produces specific error codes (FLOE-E5xx series) for actionable diagnostics:

CodeDescriptionCause
FLOE-E500Contract not foundNo datacontract.yaml AND no output_ports in floe.yaml
FLOE-E501Invalid ODCS syntaxYAML syntax error or missing required ODCS fields
FLOE-E502Unsupported ODCS versionContract uses unsupported apiVersion
FLOE-E509Parse errorFailed to parse contract YAML
CodeDescriptionCause
FLOE-E510SLA weakeningChild contract specifies weaker SLA than parent (e.g., fresher data requirement relaxed)
FLOE-E511Classification weakeningChild contract downgrades field classification (e.g., PII to public)
FLOE-E512Circular dependencyContracts form a circular inheritance chain

Version Validation Errors (FLOE-E520-E529)

Section titled “Version Validation Errors (FLOE-E520-E529)”
CodeDescriptionCause
FLOE-E520Breaking change without MAJOR bumpColumn removed, type changed, or required column added without MAJOR version increment
FLOE-E521Invalid version formatVersion doesn’t follow semantic versioning (X.Y.Z)
CodeDescriptionCause
FLOE-E530Type mismatchContract specifies different type than actual table schema
FLOE-E531Missing columnContract defines column not present in table
FLOE-E532Extra columnTable has column not defined in contract (informational)
CodeSeverityDescription
FLOE-E540warningCatalog unreachable
Terminal window
$ floe compile # planned target-state command
[3/5] Validating data contracts
FLOE-E510: Child contract weakens 'freshness' SLA
Parent requires PT6H, child specifies PT12H
Suggestion: Strengthen 'freshness' to at least match parent: PT6H
FLOE-E511: Classification weakening for field 'customers.email'
Parent requires 'pii', child specifies 'public'
Suggestion: Use classification 'pii' or stronger for 'customers.email'
FLOE-E530: Type mismatch for column 'user_id'
Contract: string, Table: integer
Suggestion: Update contract schema or table definition to match

Contract violations are emitted as OpenLineage FAIL events:

{
"eventType": "FAIL",
"job": { "name": "contract_check.sales-customers" },
"run": {
"facets": {
"contractViolation": {
"contractName": "sales-customers",
"contractVersion": "2.1.0",
"violationType": "freshness_violation",
"severity": "warning",
"message": "Data is 8 hours old, SLA is 6 hours"
}
}
}
}
MetricTypeDescription
floe_contract_violations_totalCounterTotal violations
floe_contract_freshness_hoursGaugeHours since last update
floe_contract_availability_upGauge1 if available, 0 if not
floe_contract_quality_scoreGaugeQuality score 0-100