Skip to content

Template Variables

Use Jinja2-style template variables to parameterize your SQL queries for multi-environment pipelines, dynamic table names, and dbt-style integrations.


Overview

Template variables allow you to:

  • Deploy the same SQL across multiple environments (dev, staging, prod)
  • Parameterize table names by project, region, or dataset
  • Integrate with dbt using compiled SQL output
  • Support Airflow macros for dynamic table naming
  • Validate SQL syntax before variable substitution

Basic Usage

Simple Variable Substitution

from clgraph import Pipeline

# SQL with template variables
queries = [
    """
    CREATE TABLE {{env}}_staging.orders AS
    SELECT order_id, customer_id, amount
    FROM raw.orders
    WHERE status = 'completed'
    """
]

# Build pipeline for production
prod_pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"env": "prod"}
)

# Tables created: prod_staging.orders
print(prod_pipeline.table_graph.tables)
# Output: ['raw.orders', 'prod_staging.orders']

Multi-Environment Deployment

# Same SQL, different environments
queries = [
    """
    CREATE TABLE {{env}}_staging.orders AS
    SELECT order_id, customer_id, amount
    FROM raw.orders
    """,
    """
    CREATE TABLE {{env}}_analytics.customer_metrics AS
    SELECT customer_id, COUNT(*) as order_count
    FROM {{env}}_staging.orders
    GROUP BY customer_id
    """
]

# Development environment
dev_pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"env": "dev"}
)

# Production environment
prod_pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"env": "prod"}
)

# Different tables for each environment
print(dev_pipeline.table_graph.tables)
# Output: ['raw.orders', 'dev_staging.orders', 'dev_analytics.customer_metrics']

print(prod_pipeline.table_graph.tables)
# Output: ['raw.orders', 'prod_staging.orders', 'prod_analytics.customer_metrics']

Input Formats

Template context can be provided in multiple formats:

Python Dictionary

pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={
        "env": "prod",
        "project": "my_company",
        "region": "us_central"
    }
)

YAML Configuration

import yaml

# Load from YAML file
with open("config.yaml", "r") as f:
    template_context = yaml.safe_load(f)

pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context=template_context
)

Example config.yaml:

env: production
project: analytics_pipeline
config:
  region: us-central1
  dataset: customer_data
  start_date: 2025-01-01

JSON Configuration

import json

# Load from JSON file
with open("config.json", "r") as f:
    template_context = json.load(f)

pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context=template_context
)

Template Syntax

Simple Variables

CREATE TABLE {{env}}_staging.orders AS
SELECT * FROM {{source_db}}.raw_orders

Nested Variables

CREATE TABLE {{config.project}}.{{config.env}}_staging.orders AS
SELECT order_id, customer_id, amount
FROM {{config.source_db}}.raw_{{config.region}}.orders
WHERE created_at >= '{{config.start_date}}'

Template context:

template_context = {
    "config": {
        "project": "data_platform",
        "env": "staging",
        "source_db": "external_db",
        "region": "us_central",
        "start_date": "2025-01-01"
    }
}

Factory Method Support

All Pipeline factory methods support template variables:

from_sql_list()

queries = [
    "CREATE TABLE {{project}}.staging.users AS SELECT * FROM raw.users",
    "CREATE TABLE {{project}}.analytics.users AS SELECT * FROM {{project}}.staging.users"
]

pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"project": "my_company"}
)

from_sql_files()

pipeline = Pipeline.from_sql_files(
    "sql/",
    dialect="bigquery",
    template_context={"env": "prod", "project": "analytics"}
)

from_dict()

queries = {
    "staging": "CREATE TABLE {{project}}.staging.orders AS SELECT * FROM raw.orders",
    "analytics": "CREATE TABLE {{project}}.analytics.metrics AS SELECT * FROM {{project}}.staging.orders"
}

pipeline = Pipeline.from_dict(
    queries,
    dialect="bigquery",
    template_context={"project": "my_company"}
)

from_tuples()

queries = [
    ("staging", "CREATE TABLE {{project}}.staging.orders AS SELECT * FROM raw.orders"),
    ("analytics", "CREATE TABLE {{project}}.analytics.metrics AS SELECT * FROM {{project}}.staging.orders")
]

pipeline = Pipeline.from_tuples(
    queries,
    dialect="bigquery",
    template_context={"project": "my_company"}
)

from_sql_string()

sql = """
CREATE TABLE {{project}}.staging.orders AS
SELECT * FROM raw.orders;

CREATE TABLE {{project}}.analytics.metrics AS
SELECT * FROM {{project}}.staging.orders;
"""

pipeline = Pipeline.from_sql_string(
    sql,
    dialect="bigquery",
    template_context={"project": "my_company"}
)

Real-World Examples

Example 1: Multi-Region Deployment

# SQL with region placeholders
queries = [
    """
    CREATE TABLE {{region}}_analytics.customer_metrics AS
    SELECT
        customer_id,
        COUNT(*) as order_count,
        SUM(amount) as total_revenue
    FROM {{region}}_raw.orders
    GROUP BY customer_id
    """
]

# Deploy to US region
us_pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"region": "us"}
)

# Deploy to EU region
eu_pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"region": "eu"}
)

# Each region has independent tables
print(us_pipeline.table_graph.tables)
# Output: ['us_raw.orders', 'us_analytics.customer_metrics']

print(eu_pipeline.table_graph.tables)
# Output: ['eu_raw.orders', 'eu_analytics.customer_metrics']

Example 2: Project-Based Configuration

queries = {
    "staging": """
        CREATE TABLE {{project}}.staging.user_data AS
        SELECT user_id, username, email, created_at
        FROM {{project}}.raw.users
    """,
    "analytics": """
        CREATE TABLE {{project}}.analytics.user_summary AS
        SELECT
            user_id,
            username,
            COUNT(*) as activity_count
        FROM {{project}}.staging.user_data
        JOIN {{project}}.raw.activities USING (user_id)
        GROUP BY user_id, username
    """
}

pipeline = Pipeline.from_dict(
    queries,
    dialect="bigquery",
    template_context={"project": "my_company"}
)

# All tables namespaced by project
print(pipeline.table_graph.tables)
# Output: [
#   'my_company.raw.users',
#   'my_company.staging.user_data',
#   'my_company.raw.activities',
#   'my_company.analytics.user_summary'
# ]

Example 3: CI/CD Pipeline

import os
import yaml

# Load environment-specific config
env = os.getenv("DEPLOY_ENV", "dev")
with open(f"config/{env}.yaml", "r") as f:
    config = yaml.safe_load(f)

# Load SQL files
pipeline = Pipeline.from_sql_files(
    "sql/",
    dialect="bigquery",
    template_context=config
)

# Validate before deployment
print(f"Deploying to {env} environment")
print(f"Tables to create: {len(pipeline.table_graph.tables)}")

# Execute pipeline
results = pipeline.run(executor=bigquery_executor)
print(f"✅ Deployed {len(results['completed'])} tables to {env}")

Column Lineage with Templates

Template variables work seamlessly with column lineage tracing:

queries = [
    """
    CREATE TABLE {{env}}_staging.orders AS
    SELECT order_id, customer_id, amount
    FROM raw.orders
    """,
    """
    CREATE TABLE {{env}}_analytics.customer_metrics AS
    SELECT
        customer_id,
        SUM(amount) as total_revenue
    FROM {{env}}_staging.orders
    GROUP BY customer_id
    """
]

pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery",
    template_context={"env": "prod"}
)

# Trace column lineage (template variables resolved)
sources = pipeline.trace_column_backward(
    "prod_analytics.customer_metrics",
    "total_revenue"
)

print(f"Source columns for total_revenue:")
for source in sources:
    print(f"  - {source.table_name}.{source.column_name}")

# Output:
#   - raw.orders.amount

Validation Without Context

You can validate SQL syntax without providing template context. The template syntax will be preserved:

queries = [
    "CREATE TABLE {{env}}_staging.orders AS SELECT * FROM raw.orders"
]

# No template_context provided
pipeline = Pipeline.from_sql_list(
    queries,
    dialect="bigquery"
    # NO template_context
)

# Template syntax preserved
print(pipeline.table_graph.tables)
# Output: ['raw.orders', '{{env}}_staging.orders']

This is useful for:

  • SQL syntax validation before deployment
  • Linting SQL files in CI/CD
  • Testing SQL structure without specific environment values

Integration Examples

dbt Integration

Use dbt's compiled SQL output with clgraph:

# After running: dbt compile
pipeline = Pipeline.from_sql_files(
    "target/compiled/my_project/models/",
    dialect="bigquery",
    template_context={
        "env": "prod"
    }
)

Note: dbt's {{ ref() }} and {{ source() }} functions are resolved during dbt compile. Use the compiled SQL with clgraph for lineage analysis.

Airflow Macros

Replace Airflow macros with template variables:

# Before (Airflow template)
sql = """
CREATE TABLE {{ params.env }}_staging.orders AS
SELECT * FROM raw.orders
WHERE date = '{{ ds }}'
"""

# With clgraph (validate structure)
pipeline = Pipeline.from_sql_string(
    sql,
    dialect="bigquery",
    template_context={
        "params": {"env": "prod"},
        "ds": "2025-01-01"
    }
)

Best Practices

1. Use Configuration Files

Store template variables in separate config files:

# config/production.yaml
env: production
project: analytics_platform
config:
  region: us-central1
  dataset: customer_data
  retention_days: 90
import yaml

with open("config/production.yaml") as f:
    config = yaml.safe_load(f)

pipeline = Pipeline.from_sql_files(
    "sql/",
    dialect="bigquery",
    template_context=config
)

2. Consistent Naming Conventions

Use consistent variable names across your organization:

  • env - Environment (dev, staging, prod)
  • project - GCP project or namespace
  • region - Geographic region
  • dataset - BigQuery dataset or database schema

3. Document Required Variables

Create a README documenting required variables:

## Required Template Variables

- `env` (string): Environment name (dev/staging/prod)
- `project` (string): GCP project ID
- `config.region` (string): Data region (us-central1, eu-west1)
- `config.start_date` (string): Processing start date (YYYY-MM-DD)

4. Validate Templates

Always validate template resolution before deployment:

# Check that all templates were resolved
for table in pipeline.table_graph.tables:
    if "{{" in table or "}}" in table:
        raise ValueError(f"Unresolved template in table: {table}")

Troubleshooting

Undefined Variables

If a template variable is not provided, Jinja2 will raise an error:

# ERROR: Missing 'project' variable
pipeline = Pipeline.from_sql_list(
    ["CREATE TABLE {{project}}.orders AS SELECT * FROM raw.orders"],
    dialect="bigquery",
    template_context={"env": "prod"}  # 'project' missing!
)

Solution: Ensure all variables used in templates are defined in template_context.

Syntax Errors

Template variables must use Jinja2 syntax: {{ variable }}

# ❌ Wrong
sql = "CREATE TABLE $env_staging.orders ..."  # Shell-style
sql = "CREATE TABLE %env%_staging.orders ..." # Batch-style

# ✅ Correct
sql = "CREATE TABLE {{env}}_staging.orders ..."  # Jinja2-style

API Reference

Pipeline Factory Methods

All factory methods accept template_context parameter:

Pipeline.from_sql_list(
    queries: List[str],
    dialect: str,
    template_context: Optional[Dict[str, Any]] = None
)

Pipeline.from_sql_files(
    sql_directory: str,
    dialect: str,
    template_context: Optional[Dict[str, Any]] = None
)

Pipeline.from_dict(
    queries: Dict[str, str],
    dialect: str,
    template_context: Optional[Dict[str, Any]] = None
)

Pipeline.from_tuples(
    queries: List[Tuple[str, str]],
    dialect: str,
    template_context: Optional[Dict[str, Any]] = None
)

Pipeline.from_sql_string(
    sql: str,
    dialect: str,
    template_context: Optional[Dict[str, Any]] = None
)

Complete Example

See examples/template_variables_example.py for a comprehensive example with 7 different use cases.


Next Steps