Contract Monitoring Guide
This guide explains how to set up and use contract monitoring in floe.
Overview
Section titled “Overview”Contract monitoring validates that data products meet their declared SLAs and schema agreements. The ContractMonitor service runs continuously, checking contracts at configurable intervals and emitting violations as OpenLineage events.
Prerequisites
Section titled “Prerequisites”- floe with data contracts enabled
- OpenLineage-compatible backend (Marquez, Atlan, etc.)
- Optional: Prometheus for metrics
Quick Start
Section titled “Quick Start”1. Define a Contract
Section titled “1. Define a Contract”Create datacontract.yaml alongside your data product:
apiVersion: v3.0.2kind: DataContractname: my-customersversion: 1.0.0
owner: data-team@example.com
models: customers: elements: customer_id: type: string primaryKey: true email: type: string format: email
slaProperties: freshness: value: "PT6H" element: updated_at availability: value: "99.9%"2. Enable Monitoring in Platform Manifest
Section titled “2. Enable Monitoring in Platform Manifest”data_contracts: enforcement: alert_only monitoring: enabled: true mode: scheduled freshness: check_interval: 15m schema_drift: check_interval: 1h3. Compile and Run
Section titled “3. Compile and Run”This is a planned data-team lifecycle example. In the current alpha, use the Customer 360 repo-local validation path for artifact evidence.
floe compile # planned target-state commandfloe run # planned target-state commandThe ContractMonitor will automatically start and begin checking contracts.
Configuration
Section titled “Configuration”Monitoring Modes
Section titled “Monitoring Modes”| Mode | Description | Use Case |
|---|---|---|
scheduled | Fixed intervals | Production (default) |
continuous | Event-driven | Real-time requirements |
on_demand | Manual trigger only | Development/testing |
Check Intervals
Section titled “Check Intervals”monitoring: freshness: check_interval: 15m # Check freshness every 15 minutes schema_drift: check_interval: 1h # Check schema every hour quality: check_interval: 6h # Run quality checks every 6 hours availability: check_interval: 5m # Check availability every 5 minutesEnforcement Levels
Section titled “Enforcement Levels”| Level | Behavior |
|---|---|
off | No monitoring |
warn | Log warnings only |
alert_only | Emit OpenLineage FAIL events (default) |
block | Block processing on violation |
Monitoring Checks
Section titled “Monitoring Checks”Freshness Check
Section titled “Freshness Check”Verifies data is updated within the SLA window.
slaProperties: freshness: value: "PT6H" # Max 6 hours since last update element: updated_at # Column to checkHow it works:
- Query
MAX(updated_at)from the data source - Calculate time since last update
- Compare against SLA threshold
- Emit violation if exceeded
Schema Drift Check
Section titled “Schema Drift Check”Detects when actual schema differs from contract.
Detected changes:
- Removed columns (breaking)
- Type changes (breaking)
- New required columns (breaking)
- New optional columns (info)
- Nullability changes
Example violation:
{ "violationType": "schema_drift", "message": "Breaking changes: [Removed column: email, Type change: id (int → string)]"}Availability Check
Section titled “Availability Check”Verifies data source is accessible.
slaProperties: availability: value: "99.9%"Quality Check
Section titled “Quality Check”Runs quality rules defined in the contract.
slaProperties: quality: completeness: "99%" # Non-null required fields uniqueness: "100%" # Primary key uniquenessViewing Violations
Section titled “Viewing Violations”OpenLineage Events
Section titled “OpenLineage Events”Violations are emitted as OpenLineage FAIL events:
{ "eventType": "FAIL", "job": { "namespace": "floe", "name": "contract_check.my-customers" }, "run": { "facets": { "contractViolation": { "contractName": "my-customers", "contractVersion": "1.0.0", "violationType": "freshness_violation", "severity": "warning", "message": "Data is 8 hours old, SLA is 6 hours" } } }}Prometheus Metrics
Section titled “Prometheus Metrics”# Total violations by typesum(floe_contract_violations_total) by (contract, type)
# Current freshness in hoursfloe_contract_freshness_hours{contract="my-customers"}
# Availability statusfloe_contract_availability_up{contract="my-customers"}
# Schema drift detectionfloe_contract_schema_drift_detected{contract="my-customers"}Sample Grafana Dashboard
Section titled “Sample Grafana Dashboard”# panels:- title: Contract Violations type: stat targets: - expr: sum(increase(floe_contract_violations_total[24h]))
- title: Freshness by Contract type: gauge targets: - expr: floe_contract_freshness_hours thresholds: - value: 6 color: green - value: 12 color: yellow - value: 24 color: red
- title: Availability Status type: stat targets: - expr: floe_contract_availability_upPost-Run Validation
Section titled “Post-Run Validation”The orchestrator automatically runs contract checks after each pipeline run:
# In DagsterOrchestratorPlugin@asset(post_hooks=[contract_check_hook])def my_asset(context): # ... dbt run ... pass
def contract_check_hook(context): violations = await contract_monitor.check_contract_post_run("my-customers") if violations: context.log.warning(f"{len(violations)} contract violations detected")Manual Checks
Section titled “Manual Checks”# Check a specific contractfloe contract check my-customers
# Check all contractsfloe contract check --all
# Validate contract filefloe contract validate datacontract.yamlPython API
Section titled “Python API”from floe_runtime.monitoring import ContractMonitor
monitor = ContractMonitor(config, plugin, emitter)
# Check single contractviolations = await monitor.check_contract_post_run("my-customers")
# Check all contractsall_violations = await monitor.check_all_contracts()Alerting
Section titled “Alerting”Configure Alerting
Section titled “Configure Alerting”data_contracts: alerting: openlineage_events: true prometheus_metrics: true slack: webhook_url: ${SLACK_WEBHOOK_URL} channel: "#data-alerts" pagerduty: service_key: ${PAGERDUTY_KEY} severity_threshold: error # Only page on error/criticalAlert Rules (Prometheus Alertmanager)
Section titled “Alert Rules (Prometheus Alertmanager)”groups: - name: contract-violations rules: - alert: ContractFreshnessViolation expr: floe_contract_freshness_hours > 12 for: 5m labels: severity: warning annotations: summary: "Contract {{ $labels.contract }} freshness SLA violated"
- alert: ContractUnavailable expr: floe_contract_availability_up == 0 for: 5m labels: severity: critical annotations: summary: "Contract {{ $labels.contract }} data source unavailable"
- alert: ContractSchemaDrift expr: floe_contract_schema_drift_detected == 1 for: 1m labels: severity: error annotations: summary: "Schema drift detected for {{ $labels.contract }}"Troubleshooting
Section titled “Troubleshooting”Common Issues
Section titled “Common Issues”| Issue | Cause | Solution |
|---|---|---|
| No violations emitted | Monitoring disabled | Check monitoring.enabled: true |
| Missing metrics | Prometheus not configured | Enable prometheus_metrics: true |
| False positives | Too strict SLA | Adjust SLA thresholds |
| Schema drift false positive | Dynamic columns | Exclude in contract config |
Debug Mode
Section titled “Debug Mode”# Run with verbose loggingFLOE_LOG_LEVEL=debug floe run # planned target-state command
# Check contract directlyfloe contract test datacontract.yaml --connection prod --verboseViewing Logs
Section titled “Viewing Logs”# Kuberneteskubectl logs -l app=floe -c contract-monitor
# Dockerdocker logs floe 2>&1 | grep "contract"Best Practices
Section titled “Best Practices”- Start with
alert_only: Don’t block processing until SLAs are tuned - Tune thresholds gradually: Start lenient, tighten over time
- Use appropriate intervals: Frequent checks for critical data, infrequent for batch
- Document SLA rationale: Explain why thresholds are set
- Set up dashboards early: Visibility helps catch issues