Skip to content

Pipeline

The Pipeline class is the main entry point for clgraph. It provides methods to create pipelines from SQL, trace lineage, manage metadata, and execute queries.

# The preamble provides a sample pipeline with 3 tables:
# - raw.orders (source)
# - staging.orders (intermediate)
# - analytics.customer_metrics (final)

print(f"Pipeline has {len(pipeline.columns)} columns")
print(f"Tables: {list(pipeline.table_graph.tables.keys())}")

Factory Methods

Create pipelines from various SQL input formats.

from_sql_list

Create a pipeline from a list of SQL strings.

Pipeline.from_sql_list(
    queries: List[str],
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Parameters: - queries: List of SQL statements - dialect: SQL dialect (bigquery, snowflake, postgres, etc.) - template_context: Variables for Jinja2 template substitution (see Template Variables)

Example:

pipeline = Pipeline.from_sql_list([
    "CREATE TABLE staging AS SELECT id, name FROM raw.users",
    "CREATE TABLE output AS SELECT id, UPPER(name) as name FROM staging"
])

from_dict

Create a pipeline from a dictionary of query_id to SQL.

Pipeline.from_dict(
    queries: Dict[str, str],
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Example:

pipeline = Pipeline.from_dict({
    "staging_users": "CREATE TABLE staging AS SELECT * FROM raw.users",
    "final_output": "CREATE TABLE output AS SELECT * FROM staging"
})

from_tuples

Create a pipeline from a list of (query_id, sql) tuples.

Pipeline.from_tuples(
    queries: List[Tuple[str, str]],
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Example:

pipeline = Pipeline.from_tuples([
    ("staging", "CREATE TABLE staging AS SELECT * FROM raw"),
    ("output", "CREATE TABLE output AS SELECT * FROM staging")
])

from_sql_string

Create a pipeline from a semicolon-separated SQL string.

Pipeline.from_sql_string(
    sql: str,
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Example:

sql = """
CREATE TABLE staging AS SELECT * FROM raw;
CREATE TABLE output AS SELECT * FROM staging;
"""
pipeline = Pipeline.from_sql_string(sql)

from_sql_files

Create a pipeline from SQL files in a directory.

Pipeline.from_sql_files(
    sql_dir: str,
    dialect: str = "bigquery",
    pattern: str = "*.sql",
    query_id_from: str = "filename",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Parameters: - sql_dir: Directory containing SQL files - dialect: SQL dialect (bigquery, snowflake, postgres, etc.) - pattern: Glob pattern for matching SQL files (default: "*.sql") - query_id_from: How to determine query IDs: - "filename": Use filename without extension (default) - "comment": Extract from first line comment -- query_id: name - template_context: Variables for Jinja2 template substitution (see Template Variables)

Pattern Parameter:

The pattern parameter accepts glob patterns to filter SQL files:

  • "*.sql" - All .sql files in the root directory (default)
  • "**/*.sql" - All .sql files in root and all subdirectories
  • "staging/**/*.sql" - All .sql files in staging/ and its subdirectories
  • "transform_*.sql" - Files starting with transform_ in root directory
  • "[0-9]*.sql" - Files starting with a digit in root directory

Examples:

# Load all SQL files from root directory only
pipeline = Pipeline.from_sql_files("/path/to/sql/")

# Load all SQL files recursively from subdirectories
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="**/*.sql"
)

# Load only staging queries
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="staging/**/*.sql"
)

# Load queries with specific naming pattern
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="transform_*.sql"
)

# Use query IDs from comments instead of filenames
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    query_id_from="comment"
)

Lineage Methods

Trace data flow through your pipeline.

get_column

Get a column by table and column name.

pipeline.get_column(
    table_name: str,
    column_name: str,
    query_id: Optional[str] = None
) -> Optional[ColumnNode]

Parameters: - table_name: The table name - column_name: The column name - query_id: Optional query_id to filter by (useful when same table.column exists in multiple queries)

Example:

# Get a specific column
col = pipeline.get_column("raw.orders", "amount")
if col:
    print(f"Found: {col.full_name}")
    print(f"Expression: {col.expression}")

trace_column_backward

Find all source columns that contribute to a given column.

pipeline.trace_column_backward(
    table_name: str,
    column_name: str
) -> List[ColumnNode]

Example:

# Find source columns for total_amount in analytics
sources = pipeline.trace_column_backward("analytics.customer_metrics", "total_amount")
print(f"Sources for analytics.customer_metrics.total_amount:")
for source in sources:
    print(f"  - {source.full_name}")

trace_column_forward

Find all downstream columns impacted by a given column.

pipeline.trace_column_forward(
    table_name: str,
    column_name: str
) -> List[ColumnNode]

Example:

# Find what's impacted by a source column
impacts = pipeline.trace_column_forward("raw.orders", "amount")
print(f"Changing raw.orders.amount affects {len(impacts)} columns:")
for col in impacts:
    print(f"  - {col.full_name}")

get_lineage_path

Find the lineage path between two columns.

pipeline.get_lineage_path(
    from_table: str,
    from_column: str,
    to_table: str,
    to_column: str
) -> List[ColumnEdge]

Example:

# Find path from source to final column
path = pipeline.get_lineage_path(
    "raw.orders", "amount",
    "analytics.customer_metrics", "total_amount"
)
if path:
    print("Lineage path:")
    for edge in path:
        print(f"  {edge.from_node.full_name} -> {edge.to_node.full_name}")

trace_column_backward_full

Trace a column backward with full transparency, returning all nodes and edges.

pipeline.trace_column_backward_full(
    table_name: str,
    column_name: str,
    include_ctes: bool = True
) -> Tuple[List[ColumnNode], List[ColumnEdge]]

Parameters: - table_name: The table containing the column to trace - column_name: The column name to trace - include_ctes: If True, include CTE columns; if False, only real tables

Returns: Tuple of (nodes, edges) representing the complete lineage path.

Example:

# Get full lineage with all intermediate steps
nodes, edges = pipeline.trace_column_backward_full(
    "analytics.customer_metrics", "total_amount"
)

print(f"Lineage includes {len(nodes)} columns and {len(edges)} edges")
for node in nodes:
    print(f"  {node.full_name}: {node.expression}")

get_table_lineage_path

Get simplified table-level lineage path for a column (skipping CTEs).

pipeline.get_table_lineage_path(
    table_name: str,
    column_name: str
) -> List[Tuple[str, str, str]]

Returns: List of (table_name, column_name, query_id) tuples.

Example:

# Get table-level path (no CTEs)
path = pipeline.get_table_lineage_path("analytics.customer_metrics", "total_amount")

for table, column, query_id in path:
    print(f"  {table}.{column} (from {query_id})")

get_table_impact_path

Get simplified table-level impact path for a column (forward lineage, skipping CTEs).

pipeline.get_table_impact_path(
    table_name: str,
    column_name: str
) -> List[Tuple[str, str, str]]

Returns: List of (table_name, column_name, query_id) tuples.

Example:

# See what downstream tables are affected
path = pipeline.get_table_impact_path("raw.orders", "amount")

print("Downstream impact:")
for table, column, query_id in path:
    print(f"  {table}.{column}")


Metadata Methods

Manage column metadata including PII flags, ownership, and descriptions.

propagate_all_metadata

Propagate PII, owner, and tags through lineage edges.

pipeline.propagate_all_metadata(verbose: bool = False)

Example:

# Mark source column as PII
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.customer_id"].owner = "security-team"
pipeline.columns["raw.orders.customer_id"].tags.add("sensitive")

# Propagate to all downstream columns
pipeline.propagate_all_metadata()

# Check propagation
downstream = pipeline.trace_column_forward("raw.orders", "customer_id")
for col in downstream:
    print(f"  {col.full_name}: pii={col.pii}")

generate_all_descriptions

Generate natural language descriptions using an LLM.

pipeline.generate_all_descriptions(
    batch_size: int = 10,
    verbose: bool = False
)

Requires LLM configuration:

from langchain_openai import ChatOpenAI

pipeline.llm = ChatOpenAI(model="gpt-4o-mini")
pipeline.generate_all_descriptions(verbose=True)

get_pii_columns

Get all columns marked as PII.

pipeline.get_pii_columns() -> List[ColumnNode]

Example:

# Mark a column as PII first
pipeline.columns["raw.orders.customer_id"].pii = True

# Retrieve all PII columns
pii_columns = pipeline.get_pii_columns()
print(f"Found {len(pii_columns)} PII columns")
for col in pii_columns:
    print(f"  - {col.full_name}")

get_columns_by_owner

Get columns owned by a specific team/person.

pipeline.get_columns_by_owner(owner: str) -> List[ColumnNode]

get_columns_by_tag

Get columns with a specific tag.

pipeline.get_columns_by_tag(tag: str) -> List[ColumnNode]

Export Methods

Export pipeline data to various formats.

to_json

Export to a JSON-serializable dictionary.

pipeline.to_json(include_metadata: bool = True) -> Dict[str, Any]

Returns: Dictionary with columns, edges, and tables keys.

Example:

# Export to JSON dictionary
data = pipeline.to_json()
print(f"Exported {len(data['columns'])} columns")
print(f"Exported {len(data['edges'])} edges")
print(f"Exported {len(data['tables'])} tables")

to_airflow_dag

Generate an Airflow DAG from the pipeline.

pipeline.to_airflow_dag(
    executor: Callable[[str], None],
    dag_id: str,
    schedule: str,
    start_date: datetime,
    default_args: Optional[dict] = None,
    **dag_kwargs
) -> DAG

Parameters: - executor: Function that takes SQL string and executes it

Example:

from datetime import datetime

def execute_sql(sql: str):
    # Execute SQL in your data warehouse
    client.query(sql)

dag = pipeline.to_airflow_dag(
    executor=execute_sql,
    dag_id="my_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False
)

to_dagster_assets

Generate Dagster assets from the pipeline.

pipeline.to_dagster_assets(
    executor: Callable[[str], None],
    group_name: Optional[str] = None,
    key_prefix: Optional[Union[str, List[str]]] = None,
    compute_kind: str = "sql",
    **asset_kwargs
) -> List[Asset]

Parameters: - executor: Function that takes SQL string and executes it - group_name: Optional asset group name for organization in Dagster UI - key_prefix: Optional prefix for asset keys (e.g., ["warehouse", "analytics"]) - compute_kind: Compute kind tag for assets (default: "sql") - **asset_kwargs: Additional asset parameters (owners, tags, etc.)

Returns: List of Dagster Asset definitions

Note: Requires dagster>=1.5.0

Example:

from dagster import Definitions

def execute_sql(sql: str):
    client.query(sql)

# Basic usage
assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="analytics",
)

defs = Definitions(assets=assets)

# Advanced usage with prefixes and metadata
assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="warehouse",
    key_prefix=["prod", "analytics"],
    compute_kind="clickhouse",
    owners=["team:data-eng"],
    tags={"domain": "finance"},
)

to_dagster_job

Generate a Dagster job from the pipeline using ops.

pipeline.to_dagster_job(
    executor: Callable[[str], None],
    job_name: str,
    description: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
    **job_kwargs
) -> Job

Parameters: - executor: Function that takes SQL string and executes it - job_name: Name for the Dagster job - description: Optional job description (auto-generated if not provided) - tags: Optional job tags for filtering in Dagster UI - **job_kwargs: Additional job parameters

Returns: Dagster Job definition

Note: For new pipelines, consider using to_dagster_assets() instead, which provides better lineage tracking and observability.

Example:

from dagster import Definitions

def execute_sql(sql: str):
    client.query(sql)

job = pipeline.to_dagster_job(
    executor=execute_sql,
    job_name="analytics_pipeline",
    description="Daily analytics refresh",
    tags={"env": "prod"},
)

# Execute locally
result = job.execute_in_process()

# Or add to Definitions
defs = Definitions(jobs=[job])

to_prefect_flow

Generate a Prefect flow from the pipeline.

pipeline.to_prefect_flow(
    executor: Callable[[str], None],
    flow_name: str,
    description: Optional[str] = None,
    retries: int = 2,
    retry_delay_seconds: int = 60,
    timeout_seconds: Optional[int] = None,
    tags: Optional[List[str]] = None,
    **flow_kwargs
) -> Flow

Parameters: - executor: Function that takes SQL string and executes it - flow_name: Name for the Prefect flow - description: Optional flow description (auto-generated if not provided) - retries: Number of retries for failed tasks (default: 2) - retry_delay_seconds: Delay between retries in seconds (default: 60) - timeout_seconds: Optional task timeout - tags: Optional list of tags for the flow - **flow_kwargs: Additional flow parameters

Returns: Prefect Flow definition

Note: Requires prefect>=3.0.0

Example:

from prefect import serve

def execute_sql(sql: str):
    client.query(sql)

# Generate Prefect flow
flow = pipeline.to_prefect_flow(
    executor=execute_sql,
    flow_name="analytics_pipeline",
    description="Daily analytics refresh",
    retries=3,
    tags=["analytics", "daily"],
)

# Run locally
flow()

# Or serve for execution
serve(flow)

to_prefect_deployment

Generate a Prefect deployment from the pipeline.

pipeline.to_prefect_deployment(
    executor: Callable[[str], None],
    flow_name: str,
    deployment_name: str,
    cron: Optional[str] = None,
    interval: Optional[int] = None,
    description: Optional[str] = None,
    tags: Optional[List[str]] = None,
    **deployment_kwargs
) -> Deployment

Parameters: - executor: Function that takes SQL string and executes it - flow_name: Name for the Prefect flow - deployment_name: Name for the deployment - cron: Optional cron schedule (e.g., "0 0 * * *" for daily) - interval: Optional interval in seconds between runs - description: Optional deployment description - tags: Optional list of tags - **deployment_kwargs: Additional deployment parameters

Returns: Prefect Deployment definition

Example:

def execute_sql(sql: str):
    client.query(sql)

# Create a scheduled deployment
deployment = pipeline.to_prefect_deployment(
    executor=execute_sql,
    flow_name="analytics_pipeline",
    deployment_name="daily_analytics",
    cron="0 6 * * *",  # Run at 6 AM daily
    tags=["production", "analytics"],
)

# Deploy to Prefect server
deployment.apply()

to_kestra_flow

Generate a Kestra flow YAML from the pipeline.

pipeline.to_kestra_flow(
    flow_id: str,
    namespace: str,
    description: Optional[str] = None,
    connection_config: Optional[Dict[str, str]] = None,
    cron: Optional[str] = None,
    retry_attempts: int = 3,
    labels: Optional[Dict[str, str]] = None,
    **kwargs
) -> str

Parameters: - flow_id: Unique identifier for the Kestra flow - namespace: Kestra namespace (e.g., "clgraph.production") - description: Optional flow description (auto-generated if not provided) - connection_config: Database connection configuration dict with url, username, password - cron: Optional cron schedule (e.g., "0 0 * * *" for daily) - retry_attempts: Number of retry attempts for failed tasks (default: 3) - labels: Optional key-value labels for the flow - **kwargs: Additional flow-level configuration

Returns: YAML string representing the Kestra flow

Note: Kestra is a declarative orchestration platform using YAML-based workflow definitions.

Example:

# Generate Kestra flow YAML
yaml_content = pipeline.to_kestra_flow(
    flow_id="analytics_pipeline",
    namespace="clgraph.production",
    description="Daily analytics refresh",
    cron="0 6 * * *",  # Run at 6 AM daily
    connection_config={
        "url": "jdbc:clickhouse://localhost:8123/default",
        "username": "default",
        "password": "",
    },
    labels={"env": "production", "team": "analytics"},
)

# Save to file
with open("flows/analytics_pipeline.yml", "w") as f:
    f.write(yaml_content)

# Deploy via Kestra API
import requests
requests.post(
    "http://localhost:8080/api/v1/flows",
    headers={"Content-Type": "application/x-yaml"},
    data=yaml_content,
)

The generated YAML flow uses io.kestra.plugin.jdbc.clickhouse.Query tasks ordered topologically to respect dependencies. Kestra executes tasks sequentially in the order they are defined.


Execution Methods

Execute pipeline queries directly.

run

Execute pipeline synchronously with parallel execution.

pipeline.run(
    executor: Callable[[str], Any],
    max_workers: int = 4,
    verbose: bool = False
) -> dict

Parameters: - executor: Function that takes SQL string and executes it

Example:

# Example executor function (mock)
executed_queries = []

def execute_sql(sql: str):
    executed_queries.append(sql)
    return {"status": "success"}

results = pipeline.run(executor=execute_sql, verbose=False)
print(f"Executed {len(executed_queries)} queries")

async_run

Execute pipeline asynchronously.

async pipeline.async_run(
    executor: Callable[[str], Awaitable[Any]],
    max_workers: int = 4,
    verbose: bool = False
) -> dict

Parameters: - executor: Async function that takes SQL string and executes it

Example:

import asyncio

async def execute_sql(sql: str):
    # Mock async executor
    return {"status": "success"}

results = asyncio.run(pipeline.async_run(executor=execute_sql))
print(f"Executed {len(results)} queries")


Pipeline Management

split

Split pipeline into independent subpipelines by sink tables.

pipeline.split(sinks: List[List[str]]) -> List[Pipeline]

Example:

# Split into subpipelines by final tables
final_tables = pipeline.table_graph.get_final_tables()
print(f"Final tables: {[t.table_name for t in final_tables]}")

# Create a subpipeline for one table
subpipelines = pipeline.split(
    sinks=[[final_tables[0].table_name]]
)
print(f"Split into {len(subpipelines)} subpipeline(s)")

build_subpipeline

Build a subpipeline containing only queries needed to build a specific table.

pipeline.build_subpipeline(target_table: str) -> Pipeline

Parameters: - target_table: The table to build (e.g., "analytics.revenue")

Returns: A new Pipeline containing only the queries needed to build target_table.

Example:

# Build only what's needed for a specific table
subpipeline = pipeline.build_subpipeline("analytics.customer_metrics")

print(f"Full pipeline: {len(pipeline.table_graph.queries)} queries")
print(f"Subpipeline: {len(subpipeline.table_graph.queries)} queries")

diff

Compare this pipeline with another version.

pipeline.diff(other: Pipeline) -> PipelineDiff

Example:

old_pipeline = Pipeline.from_sql_files("/path/to/old/")
new_pipeline = Pipeline.from_sql_files("/path/to/new/")

diff = new_pipeline.diff(old_pipeline)
print(diff.summary())

Properties

columns

Dictionary of all columns keyed by full name.

pipeline.columns: Dict[str, ColumnNode]

Example:

# Show first 3 columns
for name, col in list(pipeline.columns.items())[:3]:
    print(f"{name}: {col.expression or 'N/A'}")

edges

List of all lineage edges.

pipeline.edges: List[ColumnEdge]

table_graph

Table-level dependency graph.

pipeline.table_graph: TableDependencyGraph

Example:

# Get execution order
execution_order = pipeline.table_graph.get_execution_order()
print(f"Execution order ({len(execution_order)} tables):")
for table in execution_order:
    print(f"  {table.table_name}")

dialect

SQL dialect used for parsing.

pipeline.dialect: str

column_graph

Column-level lineage graph with all columns and edges.

pipeline.column_graph: PipelineLineageGraph

Example:

# Get column graph
graph = pipeline.column_graph

# Find source and final columns
sources = graph.get_source_columns()
finals = graph.get_final_columns()

print(f"Source columns: {len(sources)}")
print(f"Final columns: {len(finals)}")


Validation Methods

Analyze SQL quality and identify potential issues in your pipeline.

get_all_issues

Get all validation issues from all queries in the pipeline.

pipeline.get_all_issues() -> List[ValidationIssue]

Returns: Combined list of issues from individual query lineage graphs and pipeline-level lineage.

Example:

# Get all issues
issues = pipeline.get_all_issues()

print(f"Found {len(issues)} validation issues")
for issue in issues[:5]:  # Show first 5
    print(f"  [{issue.severity.value}] {issue.message}")

get_issues

Get filtered validation issues by severity, category, or query.

pipeline.get_issues(
    severity: Optional[str | IssueSeverity] = None,
    category: Optional[str | IssueCategory] = None,
    query_id: Optional[str] = None
) -> List[ValidationIssue]

Parameters: - severity: Filter by severity ('error', 'warning', 'info' or IssueSeverity enum) - category: Filter by category (string or IssueCategory enum) - query_id: Filter by query ID

Example:

from clgraph.models import IssueSeverity, IssueCategory

# Get all errors
errors = pipeline.get_issues(severity='error')
print(f"Found {len(errors)} errors")

# Get all warnings using enum
warnings = pipeline.get_issues(severity=IssueSeverity.WARNING)
print(f"Found {len(warnings)} warnings")

# Get star-related issues
star_issues = pipeline.get_issues(
    category=IssueCategory.UNQUALIFIED_STAR_MULTIPLE_TABLES
)

# Get issues from a specific query
query_issues = pipeline.get_issues(query_id='02_staging')
print(f"Found {len(query_issues)} issues in 02_staging query")

has_errors

Check if pipeline has any ERROR-level issues.

pipeline.has_errors() -> bool

Example:

if pipeline.has_errors():
    print("⚠️  Pipeline has validation errors!")
    for error in pipeline.get_issues(severity='error'):
        print(f"  - {error.message}")
else:
    print("✅ No validation errors")

has_warnings

Check if pipeline has any WARNING-level issues.

pipeline.has_warnings() -> bool

Print all validation issues in a human-readable format.

pipeline.print_issues(severity: Optional[str | IssueSeverity] = None)

Parameters: - severity: Optional filter by severity

Example:

# Print all issues grouped by severity
pipeline.print_issues()

# Print only errors
pipeline.print_issues(severity='error')


Simplified Lineage

Get simplified views of the column lineage graph.

get_simplified_column_graph

Get a simplified version of the column lineage graph with only physical table columns.

pipeline.get_simplified_column_graph() -> PipelineLineageGraph

Returns: A new PipelineLineageGraph with: - Keeps: All physical table columns (raw., staging., analytics., etc.) - Removes: CTE columns, subquery columns, star nodes - Edges:* Direct table-to-table edges (traces through CTEs/subqueries)

Example:

# Get simplified graph
simplified = pipeline.get_simplified_column_graph()

# Compare sizes
print(f"Full graph: {len(pipeline.columns)} columns")
print(f"Simplified: {len(simplified.columns)} columns")

# Use simplified graph for cleaner visualization
for col in simplified.columns.values():
    print(f"  {col.full_name}")

This is useful for: - Data catalog exports - Show only physical tables - Impact analysis - Focus on table-to-table dependencies - Visualization - Cleaner graphs without internal structures