Skip to content

Contract Monitoring Guide

This guide explains how to set up and use contract monitoring in floe.

Contract monitoring validates that data products meet their declared SLAs and schema agreements. The ContractMonitor service runs continuously, checking contracts at configurable intervals and emitting violations as OpenLineage events.

  • floe with data contracts enabled
  • OpenLineage-compatible backend (Marquez, Atlan, etc.)
  • Optional: Prometheus for metrics

Create datacontract.yaml alongside your data product:

apiVersion: v3.0.2
kind: DataContract
name: my-customers
version: 1.0.0
owner: data-team@example.com
models:
customers:
elements:
customer_id:
type: string
primaryKey: true
email:
type: string
format: email
slaProperties:
freshness:
value: "PT6H"
element: updated_at
availability:
value: "99.9%"
manifest.yaml
data_contracts:
enforcement: alert_only
monitoring:
enabled: true
mode: scheduled
freshness:
check_interval: 15m
schema_drift:
check_interval: 1h

This is a planned data-team lifecycle example. In the current alpha, use the Customer 360 repo-local validation path for artifact evidence.

Terminal window
floe compile # planned target-state command
floe run # planned target-state command

The ContractMonitor will automatically start and begin checking contracts.

ModeDescriptionUse Case
scheduledFixed intervalsProduction (default)
continuousEvent-drivenReal-time requirements
on_demandManual trigger onlyDevelopment/testing
monitoring:
freshness:
check_interval: 15m # Check freshness every 15 minutes
schema_drift:
check_interval: 1h # Check schema every hour
quality:
check_interval: 6h # Run quality checks every 6 hours
availability:
check_interval: 5m # Check availability every 5 minutes
LevelBehavior
offNo monitoring
warnLog warnings only
alert_onlyEmit OpenLineage FAIL events (default)
blockBlock processing on violation

Verifies data is updated within the SLA window.

slaProperties:
freshness:
value: "PT6H" # Max 6 hours since last update
element: updated_at # Column to check

How it works:

  1. Query MAX(updated_at) from the data source
  2. Calculate time since last update
  3. Compare against SLA threshold
  4. Emit violation if exceeded

Detects when actual schema differs from contract.

Detected changes:

  • Removed columns (breaking)
  • Type changes (breaking)
  • New required columns (breaking)
  • New optional columns (info)
  • Nullability changes

Example violation:

{
"violationType": "schema_drift",
"message": "Breaking changes: [Removed column: email, Type change: id (int → string)]"
}

Verifies data source is accessible.

slaProperties:
availability:
value: "99.9%"

Runs quality rules defined in the contract.

slaProperties:
quality:
completeness: "99%" # Non-null required fields
uniqueness: "100%" # Primary key uniqueness

Violations are emitted as OpenLineage FAIL events:

{
"eventType": "FAIL",
"job": {
"namespace": "floe",
"name": "contract_check.my-customers"
},
"run": {
"facets": {
"contractViolation": {
"contractName": "my-customers",
"contractVersion": "1.0.0",
"violationType": "freshness_violation",
"severity": "warning",
"message": "Data is 8 hours old, SLA is 6 hours"
}
}
}
}
# Total violations by type
sum(floe_contract_violations_total) by (contract, type)
# Current freshness in hours
floe_contract_freshness_hours{contract="my-customers"}
# Availability status
floe_contract_availability_up{contract="my-customers"}
# Schema drift detection
floe_contract_schema_drift_detected{contract="my-customers"}
# panels:
- title: Contract Violations
type: stat
targets:
- expr: sum(increase(floe_contract_violations_total[24h]))
- title: Freshness by Contract
type: gauge
targets:
- expr: floe_contract_freshness_hours
thresholds:
- value: 6
color: green
- value: 12
color: yellow
- value: 24
color: red
- title: Availability Status
type: stat
targets:
- expr: floe_contract_availability_up

The orchestrator automatically runs contract checks after each pipeline run:

# In DagsterOrchestratorPlugin
@asset(post_hooks=[contract_check_hook])
def my_asset(context):
# ... dbt run ...
pass
def contract_check_hook(context):
violations = await contract_monitor.check_contract_post_run("my-customers")
if violations:
context.log.warning(f"{len(violations)} contract violations detected")
Terminal window
# Check a specific contract
floe contract check my-customers
# Check all contracts
floe contract check --all
# Validate contract file
floe contract validate datacontract.yaml
from floe_runtime.monitoring import ContractMonitor
monitor = ContractMonitor(config, plugin, emitter)
# Check single contract
violations = await monitor.check_contract_post_run("my-customers")
# Check all contracts
all_violations = await monitor.check_all_contracts()
manifest.yaml
data_contracts:
alerting:
openlineage_events: true
prometheus_metrics: true
slack:
webhook_url: ${SLACK_WEBHOOK_URL}
channel: "#data-alerts"
pagerduty:
service_key: ${PAGERDUTY_KEY}
severity_threshold: error # Only page on error/critical
groups:
- name: contract-violations
rules:
- alert: ContractFreshnessViolation
expr: floe_contract_freshness_hours > 12
for: 5m
labels:
severity: warning
annotations:
summary: "Contract {{ $labels.contract }} freshness SLA violated"
- alert: ContractUnavailable
expr: floe_contract_availability_up == 0
for: 5m
labels:
severity: critical
annotations:
summary: "Contract {{ $labels.contract }} data source unavailable"
- alert: ContractSchemaDrift
expr: floe_contract_schema_drift_detected == 1
for: 1m
labels:
severity: error
annotations:
summary: "Schema drift detected for {{ $labels.contract }}"
IssueCauseSolution
No violations emittedMonitoring disabledCheck monitoring.enabled: true
Missing metricsPrometheus not configuredEnable prometheus_metrics: true
False positivesToo strict SLAAdjust SLA thresholds
Schema drift false positiveDynamic columnsExclude in contract config
Terminal window
# Run with verbose logging
FLOE_LOG_LEVEL=debug floe run # planned target-state command
# Check contract directly
floe contract test datacontract.yaml --connection prod --verbose
Terminal window
# Kubernetes
kubectl logs -l app=floe -c contract-monitor
# Docker
docker logs floe 2>&1 | grep "contract"
  1. Start with alert_only: Don’t block processing until SLAs are tuned
  2. Tune thresholds gradually: Start lenient, tighten over time
  3. Use appropriate intervals: Frequent checks for critical data, infrequent for batch
  4. Document SLA rationale: Explain why thresholds are set
  5. Set up dashboards early: Visibility helps catch issues