Skip to content

Examples

Real-world use cases and patterns for clgraph.


Example 1: Metadata from SQL Comments

Scenario: Extract metadata (descriptions, PII flags, ownership) directly from SQL comments.

from clgraph import Pipeline

sql = """
CREATE TABLE user_metrics AS
SELECT
  user_id,  -- User identifier [pii: false]
  email,    -- Email address [pii: true, owner: data-team]
  SUM(revenue) as total_revenue  /* Total revenue [tags: metric finance] */
FROM user_activity
GROUP BY user_id, email
"""

pipeline = Pipeline([("create_user_metrics", sql)], dialect="bigquery")

# Access metadata (output columns use table.column format)
email_col = pipeline.columns["user_metrics.email"]
print(email_col.description)  # "Email address"
print(email_col.pii)          # True
print(email_col.owner)        # "data-team"

# Find PII columns
pii_columns = pipeline.get_pii_columns()
for col in pii_columns:
    print(f"PII: {col.full_name} - {col.description}")

# Find columns by tag
metrics = pipeline.get_columns_by_tag("metric")
for col in metrics:
    print(f"Metric: {col.full_name} - {col.description}")

Result: Metadata extracted automatically from inline comments. Learn more →


Example 2: PII Compliance Audit

Scenario: You need to find all columns containing PII across your entire data warehouse.

from clgraph import Pipeline

# Create a sample pipeline with PII data
# Note: If you have SQL files in a directory, use: Pipeline.from_sql_files("examples/sql_files/", dialect="bigquery")
queries = [
    ("create_raw_users", "CREATE TABLE users AS SELECT user_id, email, phone, ssn FROM source.users"),
    ("create_raw_orders", "CREATE TABLE orders AS SELECT order_id, customer_id, amount FROM source.orders"),
    ("create_analytics", "CREATE TABLE user_orders AS SELECT u.email, o.amount FROM users u JOIN orders o ON u.user_id = o.customer_id")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Mark PII columns at source
pii_sources = {
    "create_raw_users:users.email": True,
    "create_raw_users:users.phone": True,
    "create_raw_users:users.ssn": True,
    "create_raw_orders:orders.customer_id": True,
}

for column_path, is_pii in pii_sources.items():
    if column_path in pipeline.columns:
        pipeline.columns[column_path].pii = is_pii
        pipeline.columns[column_path].tags = ["gdpr", "sensitive"]

# Propagate through entire pipeline
pipeline.propagate_all_metadata()

# Generate compliance report
pii_columns = pipeline.get_pii_columns()

print(f"Found {len(pii_columns)} columns containing PII:\n")

by_table = {}
for col in pii_columns:
    if col.table_name not in by_table:
        by_table[col.table_name] = []
    by_table[col.table_name].append(col.column_name)

for table, columns in sorted(by_table.items()):
    print(f"{table}:")
    for col in columns:
        print(f"  - {col}")

# Export for data catalog
with open("pii_audit.json", "w") as f:
    import json
    audit_data = {
        table: cols for table, cols in by_table.items()
    }
    json.dump(audit_data, f, indent=2)

print(f"\n✅ Audit complete: {len(by_table)} tables contain PII")

Result: Complete PII audit in seconds, automatically tracking propagation through joins and transformations.


Example 3: Breaking Change Impact Analysis

Scenario: Planning to rename raw.orders.amount to raw.orders.revenue. What breaks?

from clgraph import Pipeline

# Create a sample pipeline
queries = [
    ("raw_orders", "CREATE TABLE raw.orders AS SELECT order_id, customer_id, amount FROM source.orders"),
    ("staging", "CREATE TABLE staging.orders AS SELECT customer_id, SUM(amount) as total_amount FROM raw.orders GROUP BY customer_id"),
    ("analytics", "CREATE TABLE analytics.metrics AS SELECT customer_id, total_amount, total_amount * 1.1 as revenue FROM staging.orders")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Find all affected columns
table_name = "raw.orders"
column_name = "amount"

affected = pipeline.trace_column_forward(table_name, column_name)

print(f"Renaming {table_name}.{column_name} affects {len(affected)} downstream columns:\n")

# Group by table
by_table = {}
for col in affected:
    if col.table_name not in by_table:
        by_table[col.table_name] = []
    by_table[col.table_name].append({
        'column': col.column_name,
        'operation': col.operation
    })

# Print impact report
for table, columns in sorted(by_table.items()):
    print(f"📊 {table}:")
    for col in columns:
        print(f"   - {col['column']} ({col['operation']})")

# Find owners to notify
owners = set()
for col in affected:
    if col.owner:
        owners.add(col.owner)

if owners:
    print(f"\n👥 Notify these teams: {', '.join(owners)}")

print(f"\n⚠️  Total impact: {len(by_table)} tables need updates")

Result: Exact impact analysis with transformation types and owner notifications.


Example 4: Multi-Schedule Pipeline

Scenario: Different tables update at different frequencies (realtime vs daily).

# dags/multi_schedule_pipeline.py
# Place this file in your Airflow DAGs folder

from clgraph import Pipeline
from datetime import datetime

# Create pipeline with tables for different schedules
queries = [
    ("source", "CREATE TABLE source.events AS SELECT event_id, user_id, timestamp FROM raw.events"),
    ("realtime_activity", "CREATE TABLE realtime.user_activity AS SELECT user_id, COUNT(*) as events FROM source.events GROUP BY user_id"),
    ("realtime_metrics", "CREATE TABLE realtime.live_metrics AS SELECT COUNT(*) as total_events FROM source.events"),
    ("hourly_summary", "CREATE TABLE analytics.hourly_summary AS SELECT DATE_TRUNC(timestamp, HOUR) as hour, COUNT(*) FROM source.events GROUP BY hour"),
    ("hourly_revenue", "CREATE TABLE analytics.hourly_revenue AS SELECT hour, SUM(amount) FROM analytics.hourly_summary GROUP BY hour"),
    ("daily_dashboard", "CREATE TABLE reports.daily_dashboard AS SELECT DATE(timestamp) as date FROM source.events"),
    ("daily_email", "CREATE TABLE reports.daily_email AS SELECT * FROM reports.daily_dashboard")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Split by update frequency
subpipelines = pipeline.split(
    sinks=[
        # Realtime dashboard (every 5 minutes)
        ["realtime.user_activity", "realtime.live_metrics"],

        # Hourly rollups
        ["analytics.hourly_summary", "analytics.hourly_revenue"],

        # Daily reports
        ["reports.daily_dashboard", "reports.daily_email"]
    ]
)

# Create DAGs directly - Airflow discovers them automatically
def my_executor(sql: str):
    print(f"Executing: {sql[:50]}...")

realtime_dag = subpipelines[0].to_airflow_dag(
    executor=my_executor,
    dag_id="realtime_pipeline",
    schedule="*/5 * * * *",  # Every 5 minutes
    start_date=datetime(2025, 1, 1)
)

hourly_dag = subpipelines[1].to_airflow_dag(
    executor=my_executor,
    dag_id="hourly_pipeline",
    schedule="0 * * * *",  # Every hour
    start_date=datetime(2025, 1, 1)
)

daily_dag = subpipelines[2].to_airflow_dag(
    executor=my_executor,
    dag_id="daily_pipeline",
    schedule="0 2 * * *",  # Daily at 2 AM
    start_date=datetime(2025, 1, 1)
)

Result: Three independent DAGs in a single file, automatically discovered by Airflow with optimal scheduling for each use case.


Example 5: LLM-Powered Documentation

Scenario: Auto-generate descriptions for your entire data warehouse.

from clgraph import Pipeline
from langchain_openai import ChatOpenAI

# Create a sample pipeline
queries = [
    ("users", "CREATE TABLE staging.users AS SELECT user_id, name, email FROM raw.users"),
    ("orders", "CREATE TABLE staging.orders AS SELECT order_id, user_id, amount FROM raw.orders"),
    ("metrics", "CREATE TABLE analytics.user_metrics AS SELECT u.user_id, COUNT(o.order_id) as order_count FROM staging.users u JOIN staging.orders o ON u.user_id = o.user_id GROUP BY u.user_id")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Set up LLM
pipeline.llm = ChatOpenAI(
    model="gpt-4",
    temperature=0  # Deterministic output
)

# Generate descriptions for all columns
print("Generating AI-powered descriptions...")
pipeline.generate_all_descriptions()

# Export to markdown
with open("data_dictionary.md", "w") as f:
    f.write("# Data Dictionary\n\n")
    f.write("*Auto-generated with AI and column lineage context*\n\n")

    for table_name in sorted(pipeline.table_graph.tables):
        f.write(f"## {table_name}\n\n")

        columns = [
            col for col in pipeline.columns.values()
            if col.table_name == table_name
        ]

        for col in sorted(columns, key=lambda c: c.column_name):
            f.write(f"### `{col.column_name}`\n\n")
            f.write(f"{col.description}\n\n")

            # Add lineage info
            if col.pii:
                f.write(f"**⚠️  Contains PII**\n\n")
            if col.owner:
                f.write(f"**Owner:** {col.owner}\n\n")
            if col.tags:
                f.write(f"**Tags:** {', '.join(col.tags)}\n\n")

            f.write("---\n\n")

print("✅ Documentation generated: data_dictionary.md")

Result: Complete, context-aware documentation that stays in sync with your code.


Example 6: Root Cause Analysis

Scenario: Dashboard shows unexpected values. Trace back to find the issue.

from clgraph import Pipeline

# Create a sample pipeline
queries = [
    ("raw", "CREATE TABLE raw.sales AS SELECT sale_id, amount, tax FROM source.sales"),
    ("staging", "CREATE TABLE staging.sales AS SELECT sale_id, amount + tax as total FROM raw.sales"),
    ("dashboard", "CREATE TABLE dashboard.metrics AS SELECT SUM(total) as total_revenue FROM staging.sales")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Problem: dashboard.metrics.total_revenue is 10% higher than expected
problem_table = "dashboard.metrics"
problem_column = "total_revenue"

# Trace backward to source
sources = pipeline.trace_column_backward(
    problem_table,
    problem_column
)

print(f"Tracing {problem_table}.{problem_column} back to source:\n")

for i, col in enumerate(sources, 1):
    print(f"{i}. {col.table_name}.{col.column_name}")
    if col.expression:
        print(f"   Expression: {col.expression}")
    if col.operation:
        print(f"   Operation: {col.operation}")
    print()

# Check for recent changes in source tables
source_tables = {col.table_name for col in sources if col.node_type == "source"}
print(f"Source tables to investigate:")
for table in source_tables:
    print(f"  - {table}")

Result: Complete provenance chain from dashboard to raw sources with expressions and operations.


Example 7: Team-Based Pipeline Split

Scenario: Three teams maintain different parts of the pipeline.

# dags/team_pipelines.py
# Place this file in your Airflow DAGs folder

from clgraph import Pipeline

# Create pipeline with tables for different teams
queries = [
    ("source", "CREATE TABLE source.data AS SELECT * FROM raw.data"),
    ("finance_rev", "CREATE TABLE finance.revenue_report AS SELECT SUM(amount) FROM source.data"),
    ("finance_exp", "CREATE TABLE finance.expense_dashboard AS SELECT category, SUM(cost) FROM source.data GROUP BY category"),
    ("product_users", "CREATE TABLE product.user_metrics AS SELECT user_id, COUNT(*) FROM source.data GROUP BY user_id"),
    ("product_features", "CREATE TABLE product.feature_adoption AS SELECT feature, COUNT(*) FROM source.data GROUP BY feature"),
    ("ml_training", "CREATE TABLE ml.training_data AS SELECT * FROM source.data WHERE label IS NOT NULL"),
    ("ml_features", "CREATE TABLE ml.feature_store AS SELECT feature_id, value FROM source.data")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Split by team ownership
team_pipelines = pipeline.split(
    sinks=[
        # Finance team
        ["finance.revenue_report", "finance.expense_dashboard"],

        # Product team
        ["product.user_metrics", "product.feature_adoption"],

        # ML team
        ["ml.training_data", "ml.feature_store"]
    ]
)

# Helper to set ownership metadata
def set_team_ownership(subpipeline, team):
    for table in subpipeline.table_graph.tables:
        for col in subpipeline.columns.values():
            if col.table_name == table:
                col.owner = f"{team}-team"

# Finance team DAG
set_team_ownership(team_pipelines[0], "finance")
finance_dag = team_pipelines[0].to_airflow_dag(
    executor=my_executor,
    dag_id="finance_pipeline",
    schedule="@daily",
    default_args={
        'owner': 'finance-team',
        'email': ['finance@company.com'],
        'email_on_failure': True
    }
)

# Product team DAG
set_team_ownership(team_pipelines[1], "product")
product_dag = team_pipelines[1].to_airflow_dag(
    executor=my_executor,
    dag_id="product_pipeline",
    schedule="@daily",
    default_args={
        'owner': 'product-team',
        'email': ['product@company.com'],
        'email_on_failure': True
    }
)

# ML team DAG
set_team_ownership(team_pipelines[2], "ml")
ml_dag = team_pipelines[2].to_airflow_dag(
    executor=my_executor,
    dag_id="ml_pipeline",
    schedule="@daily",
    default_args={
        'owner': 'ml-team',
        'email': ['ml@company.com'],
        'email_on_failure': True
    }
)

Result: Three team-specific DAGs in a single file, automatically discovered by Airflow with proper ownership and notifications.


Example 8: Data Quality Checks

Scenario: Add validation before pipeline execution.

from clgraph import Pipeline

# Create a sample pipeline
queries = [
    ("raw", "CREATE TABLE raw.orders AS SELECT order_id, customer_id, amount FROM source.orders"),
    ("staging", "CREATE TABLE staging.orders AS SELECT customer_id, SUM(amount) as total_amount FROM raw.orders GROUP BY customer_id")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Define quality checks
quality_checks = {
    "raw.orders": [
        "SELECT COUNT(*) FROM raw.orders WHERE amount < 0",  # No negative amounts
        "SELECT COUNT(*) FROM raw.orders WHERE customer_id IS NULL",  # No nulls
    ],
    "staging.orders": [
        "SELECT COUNT(*) FROM staging.orders WHERE total_amount != (SELECT SUM(amount) FROM raw.orders)",
    ]
}

def executor_with_validation(sql: str) -> None:
    """Execute SQL with pre-flight validation"""
    # Extract table being created
    if "CREATE TABLE" in sql:
        table_name = sql.split("CREATE TABLE")[1].split("AS")[0].strip()

        # Run quality checks for this table
        if table_name in quality_checks:
            print(f"Running quality checks for {table_name}...")
            for check in quality_checks[table_name]:
                result = client.query(check).result()
                count = list(result)[0][0]
                if count > 0:
                    raise ValueError(f"Quality check failed for {table_name}: {count} issues found")
            print(f"✅ Quality checks passed for {table_name}")

    # Execute main SQL
    client.execute(sql)

# Run with validation
results = pipeline.run(
    executor=executor_with_validation,
    max_workers=4,
    verbose=True
)

if results['failed']:
    print(f"❌ Pipeline failed quality checks: {results['failed']}")
else:
    print(f"✅ All quality checks passed")

Result: Built-in data quality validation during execution.


Example 9: Incremental Processing

Scenario: Only process new data since last run.

from clgraph import Pipeline
from datetime import datetime, timedelta

# Create a sample pipeline
queries = [
    ("raw", "CREATE TABLE raw.orders AS SELECT order_id, amount, created_at FROM source.orders"),
    ("staging", "CREATE TABLE staging.daily_orders AS SELECT DATE(created_at) as date, SUM(amount) FROM raw.orders GROUP BY date")
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Get last run timestamp
last_run = datetime.now() - timedelta(hours=24)

def incremental_executor(sql: str) -> None:
    """Add incremental logic to SQL"""
    # Add WHERE clause for incremental processing
    if "FROM raw.orders" in sql:
        # Modify SQL to only process new records
        sql = sql.replace(
            "FROM raw.orders",
            f"FROM raw.orders WHERE created_at > TIMESTAMP('{last_run}')"
        )

    print(f"Executing (incremental): {sql[:100]}...")
    client.execute(sql)

# Run incrementally
results = pipeline.run(
    executor=incremental_executor,
    max_workers=4,
    verbose=True
)

print(f"✅ Processed data since {last_run}")

Result: Efficient incremental processing for large datasets.


Example 10: SQL Validation and Issue Detection

Scenario: Validate SQL quality and catch potential issues before deployment.

from clgraph import Pipeline
from clgraph.models import IssueSeverity, IssueCategory

# Create a pipeline with potential issues
queries = [
    # Issue: Unqualified column in JOIN
    ("join_query", """
        CREATE TABLE analytics.joined AS
        SELECT user_id, amount, name
        FROM orders o
        JOIN users u ON o.user_id = u.id
    """),
    # Issue: SELECT * with multiple tables
    ("star_query", """
        CREATE TABLE staging.combined AS
        SELECT *
        FROM orders o, users u
        WHERE o.user_id = u.id
    """)
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Check for validation issues
if pipeline.has_errors():
    print("Pipeline has validation errors!")
    pipeline.print_issues(severity='error')
elif pipeline.has_warnings():
    print("Pipeline has warnings to review:")
    pipeline.print_issues(severity='warning')
else:
    print("No validation issues found")

# Get specific types of issues
star_issues = pipeline.get_issues(
    category=IssueCategory.UNQUALIFIED_STAR_MULTIPLE_TABLES
)
for issue in star_issues:
    print(f"Star issue in {issue.query_id}: {issue.message}")
    if issue.suggestion:
        print(f"  Suggestion: {issue.suggestion}")

# Export issues for CI/CD
import json
issues_json = [
    {
        "severity": issue.severity.value,
        "category": issue.category.value,
        "message": issue.message,
        "query_id": issue.query_id,
        "suggestion": issue.suggestion
    }
    for issue in pipeline.get_all_issues()
]
with open("validation_report.json", "w") as f:
    json.dump(issues_json, f, indent=2)

# Fail CI if there are errors
if pipeline.has_errors():
    raise SystemExit("Pipeline validation failed")

Result: Automated SQL quality checks with actionable suggestions.


Example 11: Simplified Lineage for Data Catalogs

Scenario: Export clean lineage without internal CTEs and subqueries for a data catalog.

from clgraph import Pipeline
import json

# Create a pipeline with CTEs
queries = [
    ("transform", """
        CREATE TABLE analytics.metrics AS
        WITH monthly_totals AS (
            SELECT customer_id, SUM(amount) as total
            FROM raw.orders
            GROUP BY customer_id
        ),
        ranked AS (
            SELECT *, RANK() OVER (ORDER BY total DESC) as rank
            FROM monthly_totals
        )
        SELECT customer_id, total, rank
        FROM ranked
        WHERE rank <= 100
    """)
]
pipeline = Pipeline.from_tuples(queries, dialect="bigquery")

# Full graph includes CTEs
print(f"Full graph: {len(pipeline.columns)} columns")
print(f"Full graph: {len(pipeline.edges)} edges")

# Simplified graph - only physical tables
simplified = pipeline.get_simplified_column_graph()
print(f"Simplified: {len(simplified.columns)} columns")
print(f"Simplified: {len(simplified.edges)} edges")

# Export simplified lineage for data catalog
catalog_data = {
    "columns": [
        {
            "name": col.full_name,
            "table": col.table_name,
            "column": col.column_name,
            "description": col.description,
            "pii": col.pii,
            "owner": col.owner
        }
        for col in simplified.columns.values()
    ],
    "lineage": [
        {
            "source": edge.from_node.full_name,
            "target": edge.to_node.full_name
        }
        for edge in simplified.edges
    ]
}

with open("catalog_lineage.json", "w") as f:
    json.dump(catalog_data, f, indent=2)

print("Exported simplified lineage for data catalog")

Result: Clean lineage export without internal query structures, perfect for data catalog integration.


Next Steps


More Patterns

Looking for specific patterns? Check out: