Skip to content

Table Lineage and Pipeline Orchestration

Overview

Once you have the lineage graph, you can use it to:

  1. Understand dependencies - Which tables depend on which
  2. Execute pipelines - Run SQL in the correct order
  3. Generate DAGs - Deploy to Airflow or other orchestrators
  4. Split pipelines - Break large graphs into smaller subpipelines

The graph becomes your execution engine.


Table Lineage

What Is Table Lineage?

Table lineage tracks which tables depend on which other tables. This is the foundation for:

  • Determining execution order
  • Impact analysis (what breaks if I change this table?)
  • Dependency visualization
  • Pipeline orchestration
from clgraph import Pipeline

pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")

# Access table-level graph
table_graph = pipeline.table_graph

Querying Table Dependencies

Get All Tables

# List all tables in the pipeline
tables = table_graph.tables
print(tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']

Get Dependencies

# What does this table depend on?
deps = table_graph.get_dependencies('analytics.customer_metrics')
print(deps)
# Output: ['staging.orders']

# What depends on this table?
downstream = table_graph.get_downstream('raw.orders')
print(downstream)
# Output: ['staging.orders', 'analytics.customer_metrics']

Get Execution Order

The graph provides topological ordering - the correct execution order respecting all dependencies:

execution_order = table_graph.get_execution_order()
print(execution_order)
# Output: [
#   ['raw.orders'],                    # Level 0: No dependencies
#   ['staging.orders'],                # Level 1: Depends on raw.orders
#   ['analytics.customer_metrics']     # Level 2: Depends on staging.orders
# ]

Each level can be executed in parallel - queries within the same level have no dependencies on each other.


Example: Pipeline with Multiple Branches

-- queries/01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- queries/02_raw_customers.sql
CREATE TABLE raw.customers AS SELECT * FROM external.customers;

-- queries/03_enriched_orders.sql
CREATE TABLE enriched.orders AS
SELECT o.*, c.name
FROM raw.orders o
JOIN raw.customers c USING(customer_id);

-- queries/04_customer_summary.sql
CREATE TABLE analytics.customer_summary AS
SELECT customer_id, name, SUM(amount) as total_spent
FROM enriched.orders
GROUP BY customer_id, name;

Parse the Pipeline:

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# View pipeline structure with query units
print(pipeline)
# Pipeline(
#   01_raw_orders: CREATE TABLE raw.orders AS SELECT * FROM external.o...
#     main
#   02_raw_customers: CREATE TABLE raw.customers AS SELECT * FROM externa...
#     main
#   03_enriched_orders: CREATE TABLE enriched.orders AS SELECT o.*, c.name FRO...
#     main
#   04_customer_summary: CREATE TABLE analytics.customer_summary AS SELECT cus...
#     main
# )

Table Graph:

graph TD
    A[raw.orders] --> C[enriched.orders]
    B[raw.customers] --> C
    C --> D[analytics.customer_summary]

Execution Order:

execution_order = table_graph.get_execution_order()
# [
#   ['raw.orders', 'raw.customers'],     # Level 0: Parallel execution
#   ['enriched.orders'],                 # Level 1: Waits for both raw tables
#   ['analytics.customer_summary']       # Level 2: Waits for enriched
# ]

Pipeline Execution

The table graph determines what to run and in what order. Now you can execute it.

Synchronous Execution

Run your pipeline with concurrent execution within each level:

def my_executor(sql: str) -> None:
    """Your SQL execution function"""
    # Execute against your database
    client.execute(sql)

# Run the pipeline
results = pipeline.run(
    executor=my_executor,
    max_workers=4,      # Parallel execution within levels
    verbose=True        # Print progress
)

print(results)
# {
#   'completed': ['raw.orders', 'staging.orders', 'analytics.customer_metrics'],
#   'failed': [],
#   'elapsed_seconds': 12.5,
#   'total_queries': 3
# }

How it works:

  1. Groups queries into execution levels (based on dependencies)
  2. Executes each level sequentially
  3. Within each level, runs queries in parallel (up to max_workers)
  4. If any query fails, execution stops and returns failed queries

Asynchronous Execution

For async database clients:

import asyncio

async def my_async_executor(sql: str) -> None:
    """Your async SQL execution function"""
    await async_client.execute(sql)

# Run async
results = await pipeline.async_run(
    executor=my_async_executor,
    max_workers=4,
    verbose=True
)

Same logic as sync execution, but: - Uses asyncio.Semaphore for concurrency control - Supports async/await patterns - Ideal for async database drivers


Execution Levels & Parallelism

Example with 6 queries:

# Execution order
# Level 0: [query_a, query_b, query_c]    ← 3 queries, no dependencies
# Level 1: [query_d, query_e]              ← 2 queries, depend on Level 0
# Level 2: [query_f]                       ← 1 query, depends on Level 1

results = pipeline.run(executor=my_executor, max_workers=4)

# Execution timeline (with max_workers=4):
# t=0:  query_a, query_b, query_c start (parallel, 3 workers)
# t=5:  All Level 0 complete
# t=5:  query_d, query_e start (parallel, 2 workers)
# t=10: All Level 1 complete
# t=10: query_f starts (1 worker)
# t=12: Complete

Sequential execution: 12 queries × 2s each = 24 seconds Parallel execution: 3 levels × ~5s each = ~15 seconds Speedup: 38% faster


Orchestrator Integration

clgraph supports multiple orchestrators for running your SQL pipelines in production. Dependencies are automatically wired based on table lineage.

Supported Orchestrators

Orchestrator Method Output
Airflow 2.x/3.x to_airflow_dag() DAG with TaskFlow operators
Dagster to_dagster_assets() Asset definitions
Prefect 3.x to_prefect_flow() Flow with tasks
Kestra to_kestra_flow() YAML flow definition
Direct execution run() / async_run() Parallel execution

Dagster Integration

Generate Dagster assets automatically from your pipeline. Assets provide better lineage tracking and observability compared to traditional jobs.

from clgraph import Pipeline
from dagster import Definitions

def execute_sql(sql: str):
    """Your database execution function."""
    # Example: ClickHouse execution
    import clickhouse_connect
    client = clickhouse_connect.get_client(host="localhost")
    client.command(sql)

pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="clickhouse")

# Generate Dagster assets
assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="analytics",
    compute_kind="clickhouse",
)

# Create Definitions for deployment
defs = Definitions(assets=assets)

What gets generated:

  1. One asset per target table - Each query that creates a table becomes a Dagster asset
  2. Dependencies wired automatically - Based on table lineage from clgraph
  3. Full asset customization - Group names, key prefixes, compute kinds, and metadata supported

Dagster Asset Example

Your SQL:

-- 01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- 02_staging_orders.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;

-- 03_analytics.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;

Generated Dagster Assets:

from clgraph import Pipeline
from dagster import Definitions

def execute_sql(sql: str):
    # Your database execution logic
    client.query(sql)

# Build pipeline
pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")

# Generate assets with clgraph
assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="data_warehouse",
    compute_kind="clickhouse",
)

# Create Definitions
defs = Definitions(assets=assets)

Deploy to Dagster:

# Save as definitions.py in your Dagster workspace
# Dagster picks it up automatically via code server

dagster dev -f definitions.py

In the Dagster UI, you'll see: - Three assets: raw_orders, staging_orders, analytics_metrics - Dependencies wired correctly - Compute kind badges showing "clickhouse"


Advanced Dagster Configuration

assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="finance",                        # Group in Dagster UI
    key_prefix=["prod", "warehouse"],            # Asset key prefix
    compute_kind="snowflake",                    # Compute badge
    owners=["team:data-eng"],                    # Asset owners
    tags={"domain": "finance", "tier": "gold"},  # Custom tags
)

Dagster Jobs (Alternative)

For workflows that prefer jobs over assets:

from clgraph import Pipeline
from dagster import Definitions

def execute_sql(sql: str):
    """Your database execution function."""
    import clickhouse_connect
    client = clickhouse_connect.get_client(host="localhost")
    client.command(sql)

pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")

job = pipeline.to_dagster_job(
    executor=execute_sql,
    job_name="analytics_pipeline",
    description="Daily analytics refresh",
    tags={"env": "prod"},
)

# Execute locally
result = job.execute_in_process()

# Or add to Definitions
defs = Definitions(jobs=[job])

Prefect Integration

Generate Prefect flows automatically from your pipeline. Prefect 3.x provides a modern, Python-native orchestration experience with automatic retries, observability, and deployment support.

from clgraph import Pipeline

def execute_sql(sql: str):
    """Your database execution function."""
    import clickhouse_connect
    client = clickhouse_connect.get_client(host="localhost")
    client.command(sql)

pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")

# Generate Prefect flow
flow = pipeline.to_prefect_flow(
    executor=execute_sql,
    flow_name="analytics_pipeline",
    retries=3,
    tags=["analytics", "daily"],
)

# Run locally
flow()

What gets generated:

  1. One task per query - Each SQL query becomes a Prefect task
  2. Dependencies wired automatically - Based on table lineage using wait_for
  3. Concurrent execution - Independent tasks run in parallel
  4. Built-in retry logic - Configurable retries with delays

Prefect Flow Example

Your SQL:

-- 01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- 02_staging_orders.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;

-- 03_analytics.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;

Generated Prefect Flow:

from clgraph import Pipeline

def execute_sql(sql: str):
    client.query(sql)

pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")

# Generate flow with clgraph
flow = pipeline.to_prefect_flow(
    executor=execute_sql,
    flow_name="data_pipeline",
    description="Daily data refresh",
    retries=2,
    retry_delay_seconds=60,
    tags=["production"],
)

# Run the flow
flow()

Deploy to Prefect:

# Run locally
python my_flow.py

# Or deploy with a schedule
prefect deploy my_flow.py:flow --name daily-analytics --cron "0 6 * * *"

In the Prefect UI, you'll see: - Flow with all tasks - Task dependencies visualized - Execution timeline and logs


Prefect Deployments

Create scheduled deployments for production:

# Create a deployment with cron schedule
deployment = pipeline.to_prefect_deployment(
    executor=execute_sql,
    flow_name="analytics_pipeline",
    deployment_name="daily_refresh",
    cron="0 6 * * *",  # Run at 6 AM daily
    tags=["production"],
)

# Apply the deployment
deployment.apply()

Advanced Prefect Configuration

flow = pipeline.to_prefect_flow(
    executor=execute_sql,
    flow_name="complex_pipeline",
    description="Multi-stage analytics pipeline",
    retries=3,                      # Retry failed tasks 3 times
    retry_delay_seconds=120,        # Wait 2 minutes between retries
    timeout_seconds=3600,           # 1 hour timeout per task
    tags=["analytics", "production", "critical"],
)

Kestra Integration

Generate Kestra flows automatically from your pipeline. Kestra is a declarative orchestration platform using YAML-based workflow definitions with a powerful UI for monitoring and execution.

from clgraph import Pipeline

pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")

# Generate Kestra flow YAML
yaml_content = pipeline.to_kestra_flow(
    flow_id="analytics_pipeline",
    namespace="clgraph.production",
    cron="0 6 * * *",  # Run at 6 AM daily
    connection_config={
        "url": "jdbc:clickhouse://localhost:8123/default",
        "username": "default",
        "password": "",
    },
)

# Save to file
with open("flows/analytics_pipeline.yml", "w") as f:
    f.write(yaml_content)

What gets generated:

  1. One task per query - Each SQL query becomes a Kestra task
  2. Topological ordering - Tasks are ordered to respect dependencies (Kestra executes sequentially)
  3. Built-in retry logic - Configurable retry attempts with ISO 8601 duration
  4. YAML-based - Human-readable, version-controllable workflow definitions

Kestra Flow Example

Your SQL:

-- 01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- 02_staging_orders.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;

-- 03_analytics.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;

Generated Kestra Flow (YAML):

id: data_pipeline
namespace: clgraph.production
description: Pipeline with 3 queries operating on 3 tables. Generated by clgraph.
labels:
  generator: clgraph
tasks:
  - id: raw_orders
    type: io.kestra.plugin.jdbc.clickhouse.Query
    url: jdbc:clickhouse://localhost:8123/default
    username: default
    password: ""
    sql: CREATE TABLE raw.orders AS SELECT * FROM external.orders
    retry:
      type: constant
      maxAttempt: 3
      interval: PT1M
  - id: staging_orders
    type: io.kestra.plugin.jdbc.clickhouse.Query
    url: jdbc:clickhouse://localhost:8123/default
    username: default
    password: ""
    sql: CREATE TABLE staging.orders AS SELECT * FROM raw.orders
    retry:
      type: constant
      maxAttempt: 3
      interval: PT1M
  - id: analytics
    type: io.kestra.plugin.jdbc.clickhouse.Query
    url: jdbc:clickhouse://localhost:8123/default
    username: default
    password: ""
    sql: CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders
    retry:
      type: constant
      maxAttempt: 3
      interval: PT1M
triggers:
  - id: schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 6 * * *"

Tasks are listed in topological order, ensuring raw_orders runs before staging_orders, which runs before analytics. Kestra executes tasks sequentially in the order defined.

Deploy to Kestra:

# Via Kestra CLI
kestra flow validate flows/analytics_pipeline.yml
kestra flow create flows/analytics_pipeline.yml

# Or via API
curl -X POST http://localhost:8080/api/v1/flows \
  -H "Content-Type: application/x-yaml" \
  --data-binary @flows/analytics_pipeline.yml

In the Kestra UI, you'll see: - Flow with all tasks visualized - Task dependencies as a DAG - Execution history and logs - Schedule triggers


Advanced Kestra Configuration

yaml_content = pipeline.to_kestra_flow(
    flow_id="complex_pipeline",
    namespace="clgraph.production",
    description="Multi-stage analytics pipeline",
    cron="0 6 * * *",              # Daily at 6 AM
    retry_attempts=5,               # Retry failed tasks 5 times
    labels={
        "env": "production",
        "team": "analytics",
        "priority": "high",
    },
    connection_config={
        "url": "jdbc:clickhouse://prod-cluster:8123/analytics",
        "username": "pipeline_user",
        "password": "${secret('CLICKHOUSE_PASSWORD')}",  # Kestra secrets
    },
)

Airflow Integration

Generate Airflow DAGs automatically from your pipeline:

from clgraph import Pipeline

pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")

# Generate Airflow DAG
dag = pipeline.to_airflow_dag(
    executor=my_executor,
    dag_id="my_data_pipeline",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5)
    }
)

What gets generated:

  1. One task per query - Each SQL query becomes an Airflow task
  2. Dependencies wired automatically - Based on table lineage
  3. Full DAG customization - All Airflow DAG parameters supported

Airflow DAG Example

Your SQL:

-- query_a.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- query_b.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;

-- query_c.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;

Generated Airflow DAG:

@dag(
    dag_id="my_data_pipeline",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    default_args={'owner': 'data-team', 'retries': 3}
)
def my_data_pipeline():
    @task
    def query_a():
        executor("CREATE TABLE raw.orders AS SELECT * FROM external.orders")

    @task
    def query_b():
        executor("CREATE TABLE staging.orders AS SELECT * FROM raw.orders")

    @task
    def query_c():
        executor("CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders")

    # Dependencies wired automatically
    query_a() >> query_b() >> query_c()

dag = my_data_pipeline()

Deploy to Airflow:

# Copy to Airflow DAGs folder
cp my_pipeline.py $AIRFLOW_HOME/dags/

# Airflow picks it up automatically
airflow dags list | grep my_data_pipeline

Advanced DAG Parameters

All Airflow DAG parameters are supported:

from datetime import datetime, timedelta

dag = pipeline.to_airflow_dag(
    executor=my_executor,
    dag_id="complex_pipeline",
    schedule="0 2 * * *",                    # Daily at 2 AM
    start_date=datetime(2025, 1, 1),
    catchup=False,                           # Don't backfill
    max_active_runs=1,                       # One run at a time
    dagrun_timeout=timedelta(hours=2),       # Timeout after 2 hours
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email': ['alerts@company.com'],
        'email_on_failure': True,
        'depends_on_past': False,
    },
    tags=['production', 'daily', 'analytics']
)

Pipeline Manipulation

Splitting Large Pipelines

For large pipelines, you may want to split into smaller subpipelines based on update frequency or ownership:

# Split pipeline into 3 subpipelines
subpipelines = pipeline.split(
    sinks=[
        "mart_customer_ltv",                        # Subpipeline 1: Customer analytics
        ["mart_product_performance", "product_sales"],  # Subpipeline 2: Product analytics
        "daily_with_running"                        # Subpipeline 3: Daily metrics
    ]
)

# Each subpipeline is independent
for i, subpipeline in enumerate(subpipelines):
    print(f"Subpipeline {i}: {len(subpipeline.table_graph.tables)} tables")

How it works:

  1. Backward traversal - From each sink, trace backward to find all dependencies
  2. Non-overlapping assignment - Shared dependencies go to the first subpipeline that needs them
  3. Complete subgraphs - Each subpipeline has everything needed to build its sinks

Example: Splitting by Update Frequency

# Your pipeline has:
# - Real-time tables (update every 5 min)
# - Hourly tables
# - Daily tables

subpipelines = pipeline.split(
    sinks=[
        ["realtime_dashboard", "realtime_alerts"],    # Every 5 min
        ["hourly_summary", "hourly_metrics"],         # Hourly
        ["daily_report", "daily_aggregates"]          # Daily
    ]
)

# Schedule each independently
realtime_dag = subpipelines[0].to_airflow_dag(
    executor=my_executor,
    dag_id="realtime_pipeline",
    schedule="*/5 * * * *"  # Every 5 minutes
)

hourly_dag = subpipelines[1].to_airflow_dag(
    executor=my_executor,
    dag_id="hourly_pipeline",
    schedule="0 * * * *"    # Every hour
)

daily_dag = subpipelines[2].to_airflow_dag(
    executor=my_executor,
    dag_id="daily_pipeline",
    schedule="0 2 * * *"    # Daily at 2 AM
)

Example: Splitting by Team Ownership

# Split by team
subpipelines = pipeline.split(
    sinks=[
        ["finance_report", "revenue_metrics"],        # Finance team
        ["user_analytics", "engagement_dashboard"],   # Product team
        ["ml_features", "training_data"]              # ML team
    ]
)

# Each team maintains their own DAG
for i, (team, subpipeline) in enumerate([
    ("finance", subpipelines[0]),
    ("product", subpipelines[1]),
    ("ml", subpipelines[2])
]):
    dag = subpipeline.to_airflow_dag(
        executor=my_executor,
        dag_id=f"{team}_pipeline",
        schedule="@daily",
        default_args={'owner': team}
    )

Complete Example

Putting it all together:

from clgraph import Pipeline
from datetime import datetime, timedelta

# 1. Parse SQL files
pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")

# 2. Understand table lineage
print(f"Total tables: {len(pipeline.table_graph.tables)}")
print(f"Execution levels: {len(pipeline.table_graph.get_execution_order())}")

# 3. Execute locally (for testing)
def bigquery_executor(sql: str) -> None:
    from google.cloud import bigquery
    client = bigquery.Client()
    client.query(sql).result()

results = pipeline.run(
    executor=bigquery_executor,
    max_workers=4,
    verbose=True
)

if results['failed']:
    print(f"Failed queries: {results['failed']}")
else:
    print(f"✅ All {results['total_queries']} queries completed in {results['elapsed_seconds']}s")

# 4. Generate Airflow DAG (for production)
dag = pipeline.to_airflow_dag(
    executor=bigquery_executor,
    dag_id="production_pipeline",
    schedule="0 2 * * *",
    start_date=datetime(2025, 1, 1),
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': True
    }
)

# 5. Optional: Split for different schedules
subpipelines = pipeline.split(
    sinks=[
        ["realtime_dashboard"],     # Every 5 min
        ["daily_report"]            # Daily
    ]
)

Key Takeaways

Table Lineage

  • Dependency tracking - Know what depends on what
  • Execution order - Topological sort respects all dependencies
  • Impact analysis - See downstream effects of changes

Pipeline Execution

  • Sync & async - Both execution modes supported
  • Parallel execution - Queries in the same level run concurrently
  • Error handling - Stops on failure, returns results

Orchestrator Integration

  • Multiple orchestrators - Airflow 2.x/3.x, Dagster, and Prefect 3.x supported
  • Automatic dependency wiring - Based on table lineage
  • Full customization - All orchestrator parameters supported

Dagster Integration

  • Asset-based approach - Each target table becomes an asset
  • Lineage observability - Dependencies visible in Dagster UI
  • Flexible deployment - Drop definitions.py in your workspace

Prefect Integration

  • Task-based flows - Each query becomes a Prefect task
  • Automatic concurrency - Independent tasks run in parallel
  • Built-in retries - Configurable retry logic with delays
  • Simple deployment - Python-native deployment model

Airflow Integration

  • Automatic DAG generation - One task per query
  • Full customization - All Airflow parameters supported
  • Dependencies wired - Based on table lineage

Pipeline Manipulation

  • Split large pipelines - Create non-overlapping subpipelines
  • Schedule independently - Different cadences for different parts
  • Team ownership - Separate DAGs for different teams

Next Steps