Table Lineage and Pipeline Orchestration
Overview
Once you have the lineage graph, you can use it to:
- Understand dependencies - Which tables depend on which
- Execute pipelines - Run SQL in the correct order
- Generate DAGs - Deploy to Airflow or other orchestrators
- Split pipelines - Break large graphs into smaller subpipelines
The graph becomes your execution engine.
Table Lineage
What Is Table Lineage?
Table lineage tracks which tables depend on which other tables. This is the foundation for:
- Determining execution order
- Impact analysis (what breaks if I change this table?)
- Dependency visualization
- Pipeline orchestration
from clgraph import Pipeline
pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")
# Access table-level graph
table_graph = pipeline.table_graph
Querying Table Dependencies
Get All Tables
# List all tables in the pipeline
tables = table_graph.tables
print(tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']
Get Dependencies
# What does this table depend on?
deps = table_graph.get_dependencies('analytics.customer_metrics')
print(deps)
# Output: ['staging.orders']
# What depends on this table?
downstream = table_graph.get_downstream('raw.orders')
print(downstream)
# Output: ['staging.orders', 'analytics.customer_metrics']
Get Execution Order
The graph provides topological ordering - the correct execution order respecting all dependencies:
execution_order = table_graph.get_execution_order()
print(execution_order)
# Output: [
# ['raw.orders'], # Level 0: No dependencies
# ['staging.orders'], # Level 1: Depends on raw.orders
# ['analytics.customer_metrics'] # Level 2: Depends on staging.orders
# ]
Each level can be executed in parallel - queries within the same level have no dependencies on each other.
Example: Pipeline with Multiple Branches
-- queries/01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;
-- queries/02_raw_customers.sql
CREATE TABLE raw.customers AS SELECT * FROM external.customers;
-- queries/03_enriched_orders.sql
CREATE TABLE enriched.orders AS
SELECT o.*, c.name
FROM raw.orders o
JOIN raw.customers c USING(customer_id);
-- queries/04_customer_summary.sql
CREATE TABLE analytics.customer_summary AS
SELECT customer_id, name, SUM(amount) as total_spent
FROM enriched.orders
GROUP BY customer_id, name;
Parse the Pipeline:
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# View pipeline structure with query units
print(pipeline)
# Pipeline(
# 01_raw_orders: CREATE TABLE raw.orders AS SELECT * FROM external.o...
# main
# 02_raw_customers: CREATE TABLE raw.customers AS SELECT * FROM externa...
# main
# 03_enriched_orders: CREATE TABLE enriched.orders AS SELECT o.*, c.name FRO...
# main
# 04_customer_summary: CREATE TABLE analytics.customer_summary AS SELECT cus...
# main
# )
Table Graph:
graph TD
A[raw.orders] --> C[enriched.orders]
B[raw.customers] --> C
C --> D[analytics.customer_summary]
Execution Order:
execution_order = table_graph.get_execution_order()
# [
# ['raw.orders', 'raw.customers'], # Level 0: Parallel execution
# ['enriched.orders'], # Level 1: Waits for both raw tables
# ['analytics.customer_summary'] # Level 2: Waits for enriched
# ]
Pipeline Execution
The table graph determines what to run and in what order. Now you can execute it.
Synchronous Execution
Run your pipeline with concurrent execution within each level:
def my_executor(sql: str) -> None:
"""Your SQL execution function"""
# Execute against your database
client.execute(sql)
# Run the pipeline
results = pipeline.run(
executor=my_executor,
max_workers=4, # Parallel execution within levels
verbose=True # Print progress
)
print(results)
# {
# 'completed': ['raw.orders', 'staging.orders', 'analytics.customer_metrics'],
# 'failed': [],
# 'elapsed_seconds': 12.5,
# 'total_queries': 3
# }
How it works:
- Groups queries into execution levels (based on dependencies)
- Executes each level sequentially
- Within each level, runs queries in parallel (up to
max_workers) - If any query fails, execution stops and returns failed queries
Asynchronous Execution
For async database clients:
import asyncio
async def my_async_executor(sql: str) -> None:
"""Your async SQL execution function"""
await async_client.execute(sql)
# Run async
results = await pipeline.async_run(
executor=my_async_executor,
max_workers=4,
verbose=True
)
Same logic as sync execution, but:
- Uses asyncio.Semaphore for concurrency control
- Supports async/await patterns
- Ideal for async database drivers
Execution Levels & Parallelism
Example with 6 queries:
# Execution order
# Level 0: [query_a, query_b, query_c] ← 3 queries, no dependencies
# Level 1: [query_d, query_e] ← 2 queries, depend on Level 0
# Level 2: [query_f] ← 1 query, depends on Level 1
results = pipeline.run(executor=my_executor, max_workers=4)
# Execution timeline (with max_workers=4):
# t=0: query_a, query_b, query_c start (parallel, 3 workers)
# t=5: All Level 0 complete
# t=5: query_d, query_e start (parallel, 2 workers)
# t=10: All Level 1 complete
# t=10: query_f starts (1 worker)
# t=12: Complete
Sequential execution: 12 queries × 2s each = 24 seconds Parallel execution: 3 levels × ~5s each = ~15 seconds Speedup: 38% faster
Orchestrator Integration
clgraph supports multiple orchestrators for running your SQL pipelines in production. Dependencies are automatically wired based on table lineage.
Supported Orchestrators
| Orchestrator | Method | Output |
|---|---|---|
| Airflow 2.x/3.x | to_airflow_dag() |
DAG with TaskFlow operators |
| Dagster | to_dagster_assets() |
Asset definitions |
| Prefect 3.x | to_prefect_flow() |
Flow with tasks |
| Kestra | to_kestra_flow() |
YAML flow definition |
| Direct execution | run() / async_run() |
Parallel execution |
Dagster Integration
Generate Dagster assets automatically from your pipeline. Assets provide better lineage tracking and observability compared to traditional jobs.
from clgraph import Pipeline
from dagster import Definitions
def execute_sql(sql: str):
"""Your database execution function."""
# Example: ClickHouse execution
import clickhouse_connect
client = clickhouse_connect.get_client(host="localhost")
client.command(sql)
pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="clickhouse")
# Generate Dagster assets
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="analytics",
compute_kind="clickhouse",
)
# Create Definitions for deployment
defs = Definitions(assets=assets)
What gets generated:
- One asset per target table - Each query that creates a table becomes a Dagster asset
- Dependencies wired automatically - Based on table lineage from clgraph
- Full asset customization - Group names, key prefixes, compute kinds, and metadata supported
Dagster Asset Example
Your SQL:
-- 01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;
-- 02_staging_orders.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;
-- 03_analytics.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;
Generated Dagster Assets:
from clgraph import Pipeline
from dagster import Definitions
def execute_sql(sql: str):
# Your database execution logic
client.query(sql)
# Build pipeline
pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")
# Generate assets with clgraph
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="data_warehouse",
compute_kind="clickhouse",
)
# Create Definitions
defs = Definitions(assets=assets)
Deploy to Dagster:
# Save as definitions.py in your Dagster workspace
# Dagster picks it up automatically via code server
dagster dev -f definitions.py
In the Dagster UI, you'll see:
- Three assets: raw_orders, staging_orders, analytics_metrics
- Dependencies wired correctly
- Compute kind badges showing "clickhouse"
Advanced Dagster Configuration
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="finance", # Group in Dagster UI
key_prefix=["prod", "warehouse"], # Asset key prefix
compute_kind="snowflake", # Compute badge
owners=["team:data-eng"], # Asset owners
tags={"domain": "finance", "tier": "gold"}, # Custom tags
)
Dagster Jobs (Alternative)
For workflows that prefer jobs over assets:
from clgraph import Pipeline
from dagster import Definitions
def execute_sql(sql: str):
"""Your database execution function."""
import clickhouse_connect
client = clickhouse_connect.get_client(host="localhost")
client.command(sql)
pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")
job = pipeline.to_dagster_job(
executor=execute_sql,
job_name="analytics_pipeline",
description="Daily analytics refresh",
tags={"env": "prod"},
)
# Execute locally
result = job.execute_in_process()
# Or add to Definitions
defs = Definitions(jobs=[job])
Prefect Integration
Generate Prefect flows automatically from your pipeline. Prefect 3.x provides a modern, Python-native orchestration experience with automatic retries, observability, and deployment support.
from clgraph import Pipeline
def execute_sql(sql: str):
"""Your database execution function."""
import clickhouse_connect
client = clickhouse_connect.get_client(host="localhost")
client.command(sql)
pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")
# Generate Prefect flow
flow = pipeline.to_prefect_flow(
executor=execute_sql,
flow_name="analytics_pipeline",
retries=3,
tags=["analytics", "daily"],
)
# Run locally
flow()
What gets generated:
- One task per query - Each SQL query becomes a Prefect task
- Dependencies wired automatically - Based on table lineage using
wait_for - Concurrent execution - Independent tasks run in parallel
- Built-in retry logic - Configurable retries with delays
Prefect Flow Example
Your SQL:
-- 01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;
-- 02_staging_orders.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;
-- 03_analytics.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;
Generated Prefect Flow:
from clgraph import Pipeline
def execute_sql(sql: str):
client.query(sql)
pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")
# Generate flow with clgraph
flow = pipeline.to_prefect_flow(
executor=execute_sql,
flow_name="data_pipeline",
description="Daily data refresh",
retries=2,
retry_delay_seconds=60,
tags=["production"],
)
# Run the flow
flow()
Deploy to Prefect:
# Run locally
python my_flow.py
# Or deploy with a schedule
prefect deploy my_flow.py:flow --name daily-analytics --cron "0 6 * * *"
In the Prefect UI, you'll see: - Flow with all tasks - Task dependencies visualized - Execution timeline and logs
Prefect Deployments
Create scheduled deployments for production:
# Create a deployment with cron schedule
deployment = pipeline.to_prefect_deployment(
executor=execute_sql,
flow_name="analytics_pipeline",
deployment_name="daily_refresh",
cron="0 6 * * *", # Run at 6 AM daily
tags=["production"],
)
# Apply the deployment
deployment.apply()
Advanced Prefect Configuration
flow = pipeline.to_prefect_flow(
executor=execute_sql,
flow_name="complex_pipeline",
description="Multi-stage analytics pipeline",
retries=3, # Retry failed tasks 3 times
retry_delay_seconds=120, # Wait 2 minutes between retries
timeout_seconds=3600, # 1 hour timeout per task
tags=["analytics", "production", "critical"],
)
Kestra Integration
Generate Kestra flows automatically from your pipeline. Kestra is a declarative orchestration platform using YAML-based workflow definitions with a powerful UI for monitoring and execution.
from clgraph import Pipeline
pipeline = Pipeline.from_sql_files("sql/", dialect="clickhouse")
# Generate Kestra flow YAML
yaml_content = pipeline.to_kestra_flow(
flow_id="analytics_pipeline",
namespace="clgraph.production",
cron="0 6 * * *", # Run at 6 AM daily
connection_config={
"url": "jdbc:clickhouse://localhost:8123/default",
"username": "default",
"password": "",
},
)
# Save to file
with open("flows/analytics_pipeline.yml", "w") as f:
f.write(yaml_content)
What gets generated:
- One task per query - Each SQL query becomes a Kestra task
- Topological ordering - Tasks are ordered to respect dependencies (Kestra executes sequentially)
- Built-in retry logic - Configurable retry attempts with ISO 8601 duration
- YAML-based - Human-readable, version-controllable workflow definitions
Kestra Flow Example
Your SQL:
-- 01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;
-- 02_staging_orders.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;
-- 03_analytics.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;
Generated Kestra Flow (YAML):
id: data_pipeline
namespace: clgraph.production
description: Pipeline with 3 queries operating on 3 tables. Generated by clgraph.
labels:
generator: clgraph
tasks:
- id: raw_orders
type: io.kestra.plugin.jdbc.clickhouse.Query
url: jdbc:clickhouse://localhost:8123/default
username: default
password: ""
sql: CREATE TABLE raw.orders AS SELECT * FROM external.orders
retry:
type: constant
maxAttempt: 3
interval: PT1M
- id: staging_orders
type: io.kestra.plugin.jdbc.clickhouse.Query
url: jdbc:clickhouse://localhost:8123/default
username: default
password: ""
sql: CREATE TABLE staging.orders AS SELECT * FROM raw.orders
retry:
type: constant
maxAttempt: 3
interval: PT1M
- id: analytics
type: io.kestra.plugin.jdbc.clickhouse.Query
url: jdbc:clickhouse://localhost:8123/default
username: default
password: ""
sql: CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders
retry:
type: constant
maxAttempt: 3
interval: PT1M
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 6 * * *"
Tasks are listed in topological order, ensuring raw_orders runs before staging_orders, which runs before analytics. Kestra executes tasks sequentially in the order defined.
Deploy to Kestra:
# Via Kestra CLI
kestra flow validate flows/analytics_pipeline.yml
kestra flow create flows/analytics_pipeline.yml
# Or via API
curl -X POST http://localhost:8080/api/v1/flows \
-H "Content-Type: application/x-yaml" \
--data-binary @flows/analytics_pipeline.yml
In the Kestra UI, you'll see: - Flow with all tasks visualized - Task dependencies as a DAG - Execution history and logs - Schedule triggers
Advanced Kestra Configuration
yaml_content = pipeline.to_kestra_flow(
flow_id="complex_pipeline",
namespace="clgraph.production",
description="Multi-stage analytics pipeline",
cron="0 6 * * *", # Daily at 6 AM
retry_attempts=5, # Retry failed tasks 5 times
labels={
"env": "production",
"team": "analytics",
"priority": "high",
},
connection_config={
"url": "jdbc:clickhouse://prod-cluster:8123/analytics",
"username": "pipeline_user",
"password": "${secret('CLICKHOUSE_PASSWORD')}", # Kestra secrets
},
)
Airflow Integration
Generate Airflow DAGs automatically from your pipeline:
from clgraph import Pipeline
pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")
# Generate Airflow DAG
dag = pipeline.to_airflow_dag(
executor=my_executor,
dag_id="my_data_pipeline",
schedule="@daily",
start_date=datetime(2025, 1, 1),
default_args={
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
)
What gets generated:
- One task per query - Each SQL query becomes an Airflow task
- Dependencies wired automatically - Based on table lineage
- Full DAG customization - All Airflow DAG parameters supported
Airflow DAG Example
Your SQL:
-- query_a.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;
-- query_b.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;
-- query_c.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;
Generated Airflow DAG:
@dag(
dag_id="my_data_pipeline",
schedule="@daily",
start_date=datetime(2025, 1, 1),
default_args={'owner': 'data-team', 'retries': 3}
)
def my_data_pipeline():
@task
def query_a():
executor("CREATE TABLE raw.orders AS SELECT * FROM external.orders")
@task
def query_b():
executor("CREATE TABLE staging.orders AS SELECT * FROM raw.orders")
@task
def query_c():
executor("CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders")
# Dependencies wired automatically
query_a() >> query_b() >> query_c()
dag = my_data_pipeline()
Deploy to Airflow:
# Copy to Airflow DAGs folder
cp my_pipeline.py $AIRFLOW_HOME/dags/
# Airflow picks it up automatically
airflow dags list | grep my_data_pipeline
Advanced DAG Parameters
All Airflow DAG parameters are supported:
from datetime import datetime, timedelta
dag = pipeline.to_airflow_dag(
executor=my_executor,
dag_id="complex_pipeline",
schedule="0 2 * * *", # Daily at 2 AM
start_date=datetime(2025, 1, 1),
catchup=False, # Don't backfill
max_active_runs=1, # One run at a time
dagrun_timeout=timedelta(hours=2), # Timeout after 2 hours
default_args={
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email': ['alerts@company.com'],
'email_on_failure': True,
'depends_on_past': False,
},
tags=['production', 'daily', 'analytics']
)
Pipeline Manipulation
Splitting Large Pipelines
For large pipelines, you may want to split into smaller subpipelines based on update frequency or ownership:
# Split pipeline into 3 subpipelines
subpipelines = pipeline.split(
sinks=[
"mart_customer_ltv", # Subpipeline 1: Customer analytics
["mart_product_performance", "product_sales"], # Subpipeline 2: Product analytics
"daily_with_running" # Subpipeline 3: Daily metrics
]
)
# Each subpipeline is independent
for i, subpipeline in enumerate(subpipelines):
print(f"Subpipeline {i}: {len(subpipeline.table_graph.tables)} tables")
How it works:
- Backward traversal - From each sink, trace backward to find all dependencies
- Non-overlapping assignment - Shared dependencies go to the first subpipeline that needs them
- Complete subgraphs - Each subpipeline has everything needed to build its sinks
Example: Splitting by Update Frequency
# Your pipeline has:
# - Real-time tables (update every 5 min)
# - Hourly tables
# - Daily tables
subpipelines = pipeline.split(
sinks=[
["realtime_dashboard", "realtime_alerts"], # Every 5 min
["hourly_summary", "hourly_metrics"], # Hourly
["daily_report", "daily_aggregates"] # Daily
]
)
# Schedule each independently
realtime_dag = subpipelines[0].to_airflow_dag(
executor=my_executor,
dag_id="realtime_pipeline",
schedule="*/5 * * * *" # Every 5 minutes
)
hourly_dag = subpipelines[1].to_airflow_dag(
executor=my_executor,
dag_id="hourly_pipeline",
schedule="0 * * * *" # Every hour
)
daily_dag = subpipelines[2].to_airflow_dag(
executor=my_executor,
dag_id="daily_pipeline",
schedule="0 2 * * *" # Daily at 2 AM
)
Example: Splitting by Team Ownership
# Split by team
subpipelines = pipeline.split(
sinks=[
["finance_report", "revenue_metrics"], # Finance team
["user_analytics", "engagement_dashboard"], # Product team
["ml_features", "training_data"] # ML team
]
)
# Each team maintains their own DAG
for i, (team, subpipeline) in enumerate([
("finance", subpipelines[0]),
("product", subpipelines[1]),
("ml", subpipelines[2])
]):
dag = subpipeline.to_airflow_dag(
executor=my_executor,
dag_id=f"{team}_pipeline",
schedule="@daily",
default_args={'owner': team}
)
Complete Example
Putting it all together:
from clgraph import Pipeline
from datetime import datetime, timedelta
# 1. Parse SQL files
pipeline = Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")
# 2. Understand table lineage
print(f"Total tables: {len(pipeline.table_graph.tables)}")
print(f"Execution levels: {len(pipeline.table_graph.get_execution_order())}")
# 3. Execute locally (for testing)
def bigquery_executor(sql: str) -> None:
from google.cloud import bigquery
client = bigquery.Client()
client.query(sql).result()
results = pipeline.run(
executor=bigquery_executor,
max_workers=4,
verbose=True
)
if results['failed']:
print(f"Failed queries: {results['failed']}")
else:
print(f"✅ All {results['total_queries']} queries completed in {results['elapsed_seconds']}s")
# 4. Generate Airflow DAG (for production)
dag = pipeline.to_airflow_dag(
executor=bigquery_executor,
dag_id="production_pipeline",
schedule="0 2 * * *",
start_date=datetime(2025, 1, 1),
default_args={
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True
}
)
# 5. Optional: Split for different schedules
subpipelines = pipeline.split(
sinks=[
["realtime_dashboard"], # Every 5 min
["daily_report"] # Daily
]
)
Key Takeaways
Table Lineage
- Dependency tracking - Know what depends on what
- Execution order - Topological sort respects all dependencies
- Impact analysis - See downstream effects of changes
Pipeline Execution
- Sync & async - Both execution modes supported
- Parallel execution - Queries in the same level run concurrently
- Error handling - Stops on failure, returns results
Orchestrator Integration
- Multiple orchestrators - Airflow 2.x/3.x, Dagster, and Prefect 3.x supported
- Automatic dependency wiring - Based on table lineage
- Full customization - All orchestrator parameters supported
Dagster Integration
- Asset-based approach - Each target table becomes an asset
- Lineage observability - Dependencies visible in Dagster UI
- Flexible deployment - Drop definitions.py in your workspace
Prefect Integration
- Task-based flows - Each query becomes a Prefect task
- Automatic concurrency - Independent tasks run in parallel
- Built-in retries - Configurable retry logic with delays
- Simple deployment - Python-native deployment model
Airflow Integration
- Automatic DAG generation - One task per query
- Full customization - All Airflow parameters supported
- Dependencies wired - Based on table lineage
Pipeline Manipulation
- Split large pipelines - Create non-overlapping subpipelines
- Schedule independently - Different cadences for different parts
- Team ownership - Separate DAGs for different teams
Next Steps
- From SQL to Lineage Graph - Understand how the graph is built
- Quick Start - Try it yourself
- API Documentation - Full API reference
- Examples - Real-world examples