Integration Testing
Integration tests run in Kubernetes pods to ensure infrastructure parity with production.
Note: Integration tests run in Kubernetes pods, not locally with testcontainers. See K8s Testing Infrastructure for details.
K8s Test Environment
Section titled “K8s Test Environment”Integration tests execute inside a Kind cluster with minimal floe-platform deployment:
apiVersion: batch/v1kind: Jobmetadata: name: integration-testsspec: template: spec: restartPolicy: Never serviceAccountName: test-runner containers: - name: pytest image: ghcr.io/floe/test-runner:latest command: ["pytest", "tests/integration", "-v", "--junitxml=/results/junit.xml"] env: # Services discovered via K8s DNS (same as production) - name: DAGSTER_HOST value: "dagster-webserver.floe-test.svc.cluster.local" - name: DAGSTER_PORT value: "3000" - name: POSTGRES_HOST value: "dagster-postgresql.floe-test.svc.cluster.local" volumeMounts: - name: results mountPath: /results volumes: - name: results emptyDir: {}Test Fixtures (K8s-Native)
Section titled “Test Fixtures (K8s-Native)”import osimport pytestimport httpx
@pytest.fixture(scope="session")def dagster_client(): """Connect to Dagster via K8s service discovery.""" host = os.environ.get("DAGSTER_HOST", "dagster-webserver") port = os.environ.get("DAGSTER_PORT", "3000") base_url = f"http://{host}:{port}"
# Wait for Dagster to be ready client = httpx.Client(base_url=base_url, timeout=30) for _ in range(30): try: resp = client.get("/health") if resp.status_code == 200: break except httpx.ConnectError: import time time.sleep(1)
yield client client.close()
@pytest.fixture(scope="session")def postgres_connection(): """Connect to PostgreSQL via K8s service discovery.""" import psycopg2 host = os.environ.get("POSTGRES_HOST", "dagster-postgresql") conn = psycopg2.connect( host=host, database="dagster", user="dagster", password=os.environ["POSTGRES_PASSWORD"], ) yield conn conn.close()Integration Test Examples
Section titled “Integration Test Examples”Dagster Asset Tests
Section titled “Dagster Asset Tests”import pytestfrom dagster import materializefrom floe_orchestrator_dagster.loader import load_product_definitions
class TestDagsterAssets: """Integration tests for Dagster runtime loading."""
def test_dbt_assets_created(self, temp_dbt_project): """dbt assets should load from one compiled product directory.""" defs = load_product_definitions( product_name="test-product", project_dir=temp_dbt_project, )
# Should have assets from the dbt manifest. asset_keys = [a.key for a in defs.get_all_asset_specs()] assert len(asset_keys) > 0
def test_assets_execute_successfully(self, temp_dbt_project, duckdb_path): """Assets should execute without errors.""" defs = load_product_definitions( product_name="test-product", project_dir=temp_dbt_project, )
result = materialize( defs.get_all_asset_specs(), resources=defs.resources, )
assert result.successtemp_dbt_project must be a product directory containing runtime files from
the same compile context: compiled_artifacts.json, target/manifest.json,
and the dbt project/profile files that Dagster resolves from that directory
such as dbt_project.yml and profiles.yml. Generated definitions.py files
should remain thin shims that call load_product_definitions(product_name, project_dir).
Full Pipeline Tests
Section titled “Full Pipeline Tests”import pytestfrom pathlib import Pathfrom floe_cli.commands.run import execute_pipeline
class TestFullPipeline: """Integration tests for full pipeline execution."""
@pytest.fixture def sample_project(self, tmp_path) -> Path: """Create a sample project for testing.""" # Create floe.yaml floe_yaml = tmp_path / "floe.yaml" floe_yaml.write_text("""apiVersion: floe.dev/v1kind: DataProductmetadata: name: test-pipeline version: "1.0"
platform: ref: file://./manifest.yaml # Local platform config for testing
transforms: - type: dbt path: models/""")
# Create dbt project (tmp_path / "models").mkdir() (tmp_path / "models" / "test_model.sql").write_text("""SELECT 1 as id, 'test' as name""")
# Create dbt_project.yml (tmp_path / "dbt_project.yml").write_text("""name: test_projectversion: "1.0"profile: floe""")
return tmp_path
def test_pipeline_runs_successfully(self, sample_project): """Full pipeline should execute without errors.""" result = execute_pipeline( config_path=sample_project / "floe.yaml", environment="dev", )
assert result.success assert (sample_project / "test.duckdb").exists()Running Integration Tests
Section titled “Running Integration Tests”# Create Kind clusterkind create cluster --name floe-test
# Deploy minimal stackhelm install floe-test ./charts/floe-platform \ --namespace floe-test --create-namespace \ --set dagster.enabled=true \ --set polaris.enabled=false \ --wait --timeout=5m
# Run integration tests as K8s Jobkubectl apply -f tests/integration/test-job.yaml -n floe-testkubectl wait --for=condition=complete job/integration-tests -n floe-test --timeout=5m
# View resultskubectl logs job/integration-tests -n floe-test