Pipeline
The Pipeline class is the main entry point for clgraph. It provides methods to create pipelines from SQL, trace lineage, manage metadata, and execute queries.
# The preamble provides a sample pipeline with 3 tables:
# - raw.orders (source)
# - staging.orders (intermediate)
# - analytics.customer_metrics (final)
print(f"Pipeline has {len(pipeline.columns)} columns")
print(f"Tables: {list(pipeline.table_graph.tables.keys())}")
Factory Methods
Create pipelines from various SQL input formats.
from_sql_list
Create a pipeline from a list of SQL strings.
Pipeline.from_sql_list(
queries: List[str],
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Parameters:
- queries: List of SQL statements
- dialect: SQL dialect (bigquery, snowflake, postgres, etc.)
- template_context: Variables for Jinja2 template substitution (see Template Variables)
Example:
pipeline = Pipeline.from_sql_list([
"CREATE TABLE staging AS SELECT id, name FROM raw.users",
"CREATE TABLE output AS SELECT id, UPPER(name) as name FROM staging"
])
from_dict
Create a pipeline from a dictionary of query_id to SQL.
Pipeline.from_dict(
queries: Dict[str, str],
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Example:
pipeline = Pipeline.from_dict({
"staging_users": "CREATE TABLE staging AS SELECT * FROM raw.users",
"final_output": "CREATE TABLE output AS SELECT * FROM staging"
})
from_tuples
Create a pipeline from a list of (query_id, sql) tuples.
Pipeline.from_tuples(
queries: List[Tuple[str, str]],
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Example:
pipeline = Pipeline.from_tuples([
("staging", "CREATE TABLE staging AS SELECT * FROM raw"),
("output", "CREATE TABLE output AS SELECT * FROM staging")
])
from_sql_string
Create a pipeline from a semicolon-separated SQL string.
Pipeline.from_sql_string(
sql: str,
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Example:
sql = """
CREATE TABLE staging AS SELECT * FROM raw;
CREATE TABLE output AS SELECT * FROM staging;
"""
pipeline = Pipeline.from_sql_string(sql)
from_sql_files
Create a pipeline from SQL files in a directory.
Pipeline.from_sql_files(
sql_dir: str,
dialect: str = "bigquery",
pattern: str = "*.sql",
query_id_from: str = "filename",
template_context: Optional[Dict[str, Any]] = None,
allow_symlinks: bool = False
) -> Pipeline
Parameters:
- sql_dir: Directory containing SQL files
- dialect: SQL dialect (bigquery, snowflake, postgres, etc.)
- pattern: Glob pattern for matching SQL files (default: "*.sql")
- query_id_from: How to determine query IDs:
- "filename": Use filename without extension (default)
- "comment": Extract from first line comment -- query_id: name
- template_context: Variables for Jinja2 template substitution (see Template Variables)
- allow_symlinks: Whether to follow symbolic links when loading files (default: False). Set to True only if you trust the directory contents, as symlinks could point to files outside the intended directory.
Pattern Parameter:
The pattern parameter accepts glob patterns to filter SQL files:
"*.sql"- All.sqlfiles in the root directory (default)"**/*.sql"- All.sqlfiles in root and all subdirectories"staging/**/*.sql"- All.sqlfiles instaging/and its subdirectories"transform_*.sql"- Files starting withtransform_in root directory"[0-9]*.sql"- Files starting with a digit in root directory
Examples:
# Load all SQL files from root directory only
pipeline = Pipeline.from_sql_files("/path/to/sql/")
# Load all SQL files recursively from subdirectories
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="**/*.sql"
)
# Load only staging queries
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="staging/**/*.sql"
)
# Load queries with specific naming pattern
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="transform_*.sql"
)
# Use query IDs from comments instead of filenames
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
query_id_from="comment"
)
Lineage Methods
Trace data flow through your pipeline.
get_column
Get a column by table and column name.
pipeline.get_column(
table_name: str,
column_name: str,
query_id: Optional[str] = None
) -> Optional[ColumnNode]
Parameters:
- table_name: The table name
- column_name: The column name
- query_id: Optional query_id to filter by (useful when same table.column exists in multiple queries)
Example:
# Get a specific column
col = pipeline.get_column("raw.orders", "amount")
if col:
print(f"Found: {col.full_name}")
print(f"Expression: {col.expression}")
trace_column_backward
Find all source columns that contribute to a given column.
Example:
# Find source columns for total_amount in analytics
sources = pipeline.trace_column_backward("analytics.customer_metrics", "total_amount")
print(f"Sources for analytics.customer_metrics.total_amount:")
for source in sources:
print(f" - {source.full_name}")
trace_column_forward
Find all downstream columns impacted by a given column.
Example:
# Find what's impacted by a source column
impacts = pipeline.trace_column_forward("raw.orders", "amount")
print(f"Changing raw.orders.amount affects {len(impacts)} columns:")
for col in impacts:
print(f" - {col.full_name}")
get_lineage_path
Find the lineage path between two columns.
pipeline.get_lineage_path(
from_table: str,
from_column: str,
to_table: str,
to_column: str
) -> List[ColumnEdge]
Example:
# Find path from source to final column
path = pipeline.get_lineage_path(
"raw.orders", "amount",
"analytics.customer_metrics", "total_amount"
)
if path:
print("Lineage path:")
for edge in path:
print(f" {edge.from_node.full_name} -> {edge.to_node.full_name}")
trace_column_backward_full
Trace a column backward with full transparency, returning all nodes and edges.
pipeline.trace_column_backward_full(
table_name: str,
column_name: str,
include_ctes: bool = True
) -> Tuple[List[ColumnNode], List[ColumnEdge]]
Parameters:
- table_name: The table containing the column to trace
- column_name: The column name to trace
- include_ctes: If True, include CTE columns; if False, only real tables
Returns: Tuple of (nodes, edges) representing the complete lineage path.
Example:
# Get full lineage with all intermediate steps
nodes, edges = pipeline.trace_column_backward_full(
"analytics.customer_metrics", "total_amount"
)
print(f"Lineage includes {len(nodes)} columns and {len(edges)} edges")
for node in nodes:
print(f" {node.full_name}: {node.expression}")
get_table_lineage_path
Get simplified table-level lineage path for a column (skipping CTEs).
Returns: List of (table_name, column_name, query_id) tuples.
Example:
# Get table-level path (no CTEs)
path = pipeline.get_table_lineage_path("analytics.customer_metrics", "total_amount")
for table, column, query_id in path:
print(f" {table}.{column} (from {query_id})")
get_table_impact_path
Get simplified table-level impact path for a column (forward lineage, skipping CTEs).
Returns: List of (table_name, column_name, query_id) tuples.
Example:
# See what downstream tables are affected
path = pipeline.get_table_impact_path("raw.orders", "amount")
print("Downstream impact:")
for table, column, query_id in path:
print(f" {table}.{column}")
Metadata Methods
Manage column metadata including PII flags, ownership, and descriptions.
propagate_all_metadata
Propagate PII, owner, and tags through lineage edges.
Example:
# Mark source column as PII
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.customer_id"].owner = "security-team"
pipeline.columns["raw.orders.customer_id"].tags.add("sensitive")
# Propagate to all downstream columns
pipeline.propagate_all_metadata()
# Check propagation
downstream = pipeline.trace_column_forward("raw.orders", "customer_id")
for col in downstream:
print(f" {col.full_name}: pii={col.pii}")
generate_all_descriptions
Generate natural language descriptions using an LLM.
Requires LLM configuration:
from langchain_openai import ChatOpenAI
pipeline.llm = ChatOpenAI(model="gpt-4o-mini")
pipeline.generate_all_descriptions(verbose=True)
get_pii_columns
Get all columns marked as PII.
Example:
# Mark a column as PII first
pipeline.columns["raw.orders.customer_id"].pii = True
# Retrieve all PII columns
pii_columns = pipeline.get_pii_columns()
print(f"Found {len(pii_columns)} PII columns")
for col in pii_columns:
print(f" - {col.full_name}")
get_columns_by_owner
Get columns owned by a specific team/person.
get_columns_by_tag
Get columns with a specific tag.
Export Methods
Export pipeline data to various formats.
to_json
Export to a JSON-serializable dictionary.
Returns: Dictionary with columns, edges, and tables keys.
Example:
# Export to JSON dictionary
data = pipeline.to_json()
print(f"Exported {len(data['columns'])} columns")
print(f"Exported {len(data['edges'])} edges")
print(f"Exported {len(data['tables'])} tables")
to_airflow_dag
Generate an Airflow DAG from the pipeline.
pipeline.to_airflow_dag(
executor: Callable[[str], None],
dag_id: str,
schedule: str,
start_date: datetime,
default_args: Optional[dict] = None,
**dag_kwargs
) -> DAG
Parameters:
- executor: Function that takes SQL string and executes it
Example:
from datetime import datetime
def execute_sql(sql: str):
# Execute SQL in your data warehouse
client.query(sql)
dag = pipeline.to_airflow_dag(
executor=execute_sql,
dag_id="my_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False
)
to_dagster_assets
Generate Dagster assets from the pipeline.
pipeline.to_dagster_assets(
executor: Callable[[str], None],
group_name: Optional[str] = None,
key_prefix: Optional[Union[str, List[str]]] = None,
compute_kind: str = "sql",
**asset_kwargs
) -> List[Asset]
Parameters:
- executor: Function that takes SQL string and executes it
- group_name: Optional asset group name for organization in Dagster UI
- key_prefix: Optional prefix for asset keys (e.g., ["warehouse", "analytics"])
- compute_kind: Compute kind tag for assets (default: "sql")
- **asset_kwargs: Additional asset parameters (owners, tags, etc.)
Returns: List of Dagster Asset definitions
Note: Requires dagster>=1.5.0
Example:
from dagster import Definitions
def execute_sql(sql: str):
client.query(sql)
# Basic usage
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="analytics",
)
defs = Definitions(assets=assets)
# Advanced usage with prefixes and metadata
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="warehouse",
key_prefix=["prod", "analytics"],
compute_kind="clickhouse",
owners=["team:data-eng"],
tags={"domain": "finance"},
)
to_dagster_job
Generate a Dagster job from the pipeline using ops.
pipeline.to_dagster_job(
executor: Callable[[str], None],
job_name: str,
description: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
**job_kwargs
) -> Job
Parameters:
- executor: Function that takes SQL string and executes it
- job_name: Name for the Dagster job
- description: Optional job description (auto-generated if not provided)
- tags: Optional job tags for filtering in Dagster UI
- **job_kwargs: Additional job parameters
Returns: Dagster Job definition
Note: For new pipelines, consider using to_dagster_assets() instead, which provides better lineage tracking and observability.
Example:
from dagster import Definitions
def execute_sql(sql: str):
client.query(sql)
job = pipeline.to_dagster_job(
executor=execute_sql,
job_name="analytics_pipeline",
description="Daily analytics refresh",
tags={"env": "prod"},
)
# Execute locally
result = job.execute_in_process()
# Or add to Definitions
defs = Definitions(jobs=[job])
to_prefect_flow
Generate a Prefect flow from the pipeline.
pipeline.to_prefect_flow(
executor: Callable[[str], None],
flow_name: str,
description: Optional[str] = None,
retries: int = 2,
retry_delay_seconds: int = 60,
timeout_seconds: Optional[int] = None,
tags: Optional[List[str]] = None,
**flow_kwargs
) -> Flow
Parameters:
- executor: Function that takes SQL string and executes it
- flow_name: Name for the Prefect flow
- description: Optional flow description (auto-generated if not provided)
- retries: Number of retries for failed tasks (default: 2)
- retry_delay_seconds: Delay between retries in seconds (default: 60)
- timeout_seconds: Optional task timeout
- tags: Optional list of tags for the flow
- **flow_kwargs: Additional flow parameters
Returns: Prefect Flow definition
Note: Requires prefect>=3.0.0
Example:
from prefect import serve
def execute_sql(sql: str):
client.query(sql)
# Generate Prefect flow
flow = pipeline.to_prefect_flow(
executor=execute_sql,
flow_name="analytics_pipeline",
description="Daily analytics refresh",
retries=3,
tags=["analytics", "daily"],
)
# Run locally
flow()
# Or serve for execution
serve(flow)
to_prefect_deployment
Generate a Prefect deployment from the pipeline.
pipeline.to_prefect_deployment(
executor: Callable[[str], None],
flow_name: str,
deployment_name: str,
cron: Optional[str] = None,
interval: Optional[int] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
**deployment_kwargs
) -> Deployment
Parameters:
- executor: Function that takes SQL string and executes it
- flow_name: Name for the Prefect flow
- deployment_name: Name for the deployment
- cron: Optional cron schedule (e.g., "0 0 * * *" for daily)
- interval: Optional interval in seconds between runs
- description: Optional deployment description
- tags: Optional list of tags
- **deployment_kwargs: Additional deployment parameters
Returns: Prefect Deployment definition
Example:
def execute_sql(sql: str):
client.query(sql)
# Create a scheduled deployment
deployment = pipeline.to_prefect_deployment(
executor=execute_sql,
flow_name="analytics_pipeline",
deployment_name="daily_analytics",
cron="0 6 * * *", # Run at 6 AM daily
tags=["production", "analytics"],
)
# Deploy to Prefect server
deployment.apply()
to_kestra_flow
Generate a Kestra flow YAML from the pipeline.
pipeline.to_kestra_flow(
flow_id: str,
namespace: str,
description: Optional[str] = None,
connection_config: Optional[Dict[str, str]] = None,
cron: Optional[str] = None,
retry_attempts: int = 3,
labels: Optional[Dict[str, str]] = None,
**kwargs
) -> str
Parameters:
- flow_id: Unique identifier for the Kestra flow
- namespace: Kestra namespace (e.g., "clgraph.production")
- description: Optional flow description (auto-generated if not provided)
- connection_config: Database connection configuration dict with url, username, password
- cron: Optional cron schedule (e.g., "0 0 * * *" for daily)
- retry_attempts: Number of retry attempts for failed tasks (default: 3)
- labels: Optional key-value labels for the flow
- **kwargs: Additional flow-level configuration
Returns: YAML string representing the Kestra flow
Note: Kestra is a declarative orchestration platform using YAML-based workflow definitions.
Example:
# Generate Kestra flow YAML
yaml_content = pipeline.to_kestra_flow(
flow_id="analytics_pipeline",
namespace="clgraph.production",
description="Daily analytics refresh",
cron="0 6 * * *", # Run at 6 AM daily
connection_config={
"url": "jdbc:clickhouse://localhost:8123/default",
"username": "default",
"password": "",
},
labels={"env": "production", "team": "analytics"},
)
# Save to file
with open("flows/analytics_pipeline.yml", "w") as f:
f.write(yaml_content)
# Deploy via Kestra API
import requests
requests.post(
"http://localhost:8080/api/v1/flows",
headers={"Content-Type": "application/x-yaml"},
data=yaml_content,
)
The generated YAML flow uses io.kestra.plugin.jdbc.clickhouse.Query tasks ordered topologically to respect dependencies. Kestra executes tasks sequentially in the order they are defined.
to_mage_pipeline
Generate Mage pipeline files from the pipeline.
pipeline.to_mage_pipeline(
executor: Callable[[str], None],
pipeline_name: str,
description: Optional[str] = None,
connection_name: str = "default",
) -> Dict[str, Any]
Parameters:
- executor: Function that takes SQL string and executes it (used to determine block behavior)
- pipeline_name: Name for the Mage pipeline
- description: Optional pipeline description (auto-generated if not provided)
- connection_name: Mage io_config.yaml profile name (default: "default")
Returns: Dictionary with:
- "metadata": YAML string content for metadata.yaml
- "blocks": Dictionary mapping block names to Python code strings
Note: Mage uses a block-based architecture where source tables use @data_loader decorators and derived tables use @transformer decorators.
Example:
def execute_sql(sql: str):
client.query(sql)
# Generate Mage pipeline files
files = pipeline.to_mage_pipeline(
executor=execute_sql,
pipeline_name="analytics_pipeline",
description="Daily analytics refresh",
connection_name="clickhouse_default",
)
# Access generated content
metadata_yaml = files["metadata"]
blocks = files["blocks"]
# Write to Mage project
import os
pipeline_dir = "/path/to/mage/pipelines/analytics_pipeline"
os.makedirs(f"{pipeline_dir}/blocks", exist_ok=True)
# Write metadata
with open(f"{pipeline_dir}/metadata.yaml", "w") as f:
f.write(metadata_yaml)
# Write block files
for block_name, block_code in blocks.items():
with open(f"{pipeline_dir}/blocks/{block_name}.py", "w") as f:
f.write(block_code)
The generated pipeline uses mage_ai.io.clickhouse.ClickHouse for database connectivity and automatically wires block dependencies based on table lineage.
Execution Methods
Execute pipeline queries directly.
run
Execute pipeline synchronously with parallel execution.
Parameters:
- executor: Function that takes SQL string and executes it
Example:
# Example executor function (mock)
executed_queries = []
def execute_sql(sql: str):
executed_queries.append(sql)
return {"status": "success"}
results = pipeline.run(executor=execute_sql, verbose=False)
print(f"Executed {len(executed_queries)} queries")
async_run
Execute pipeline asynchronously.
async pipeline.async_run(
executor: Callable[[str], Awaitable[Any]],
max_workers: int = 4,
verbose: bool = False
) -> dict
Parameters:
- executor: Async function that takes SQL string and executes it
Example:
import asyncio
async def execute_sql(sql: str):
# Mock async executor
return {"status": "success"}
results = asyncio.run(pipeline.async_run(executor=execute_sql))
print(f"Executed {len(results)} queries")
Pipeline Management
split
Split pipeline into independent subpipelines by sink tables.
Example:
# Split into subpipelines by final tables
final_tables = pipeline.table_graph.get_final_tables()
print(f"Final tables: {[t.table_name for t in final_tables]}")
# Create a subpipeline for one table
subpipelines = pipeline.split(
sinks=[[final_tables[0].table_name]]
)
print(f"Split into {len(subpipelines)} subpipeline(s)")
build_subpipeline
Build a subpipeline containing only queries needed to build a specific table.
Parameters:
- target_table: The table to build (e.g., "analytics.revenue")
Returns: A new Pipeline containing only the queries needed to build target_table.
Example:
# Build only what's needed for a specific table
subpipeline = pipeline.build_subpipeline("analytics.customer_metrics")
print(f"Full pipeline: {len(pipeline.table_graph.queries)} queries")
print(f"Subpipeline: {len(subpipeline.table_graph.queries)} queries")
diff
Compare this pipeline with another version.
Example:
old_pipeline = Pipeline.from_sql_files("/path/to/old/")
new_pipeline = Pipeline.from_sql_files("/path/to/new/")
diff = new_pipeline.diff(old_pipeline)
print(diff.summary())
Properties
columns
Dictionary of all columns keyed by full name.
Example:
# Show first 3 columns
for name, col in list(pipeline.columns.items())[:3]:
print(f"{name}: {col.expression or 'N/A'}")
edges
List of all lineage edges.
table_graph
Table-level dependency graph.
Example:
# Get execution order
execution_order = pipeline.table_graph.get_execution_order()
print(f"Execution order ({len(execution_order)} tables):")
for table in execution_order:
print(f" {table.table_name}")
dialect
SQL dialect used for parsing.
column_graph
Column-level lineage graph with all columns and edges.
Example:
# Get column graph
graph = pipeline.column_graph
# Find source and final columns
sources = graph.get_source_columns()
finals = graph.get_final_columns()
print(f"Source columns: {len(sources)}")
print(f"Final columns: {len(finals)}")
Validation Methods
Analyze SQL quality and identify potential issues in your pipeline.
get_all_issues
Get all validation issues from all queries in the pipeline.
Returns: Combined list of issues from individual query lineage graphs and pipeline-level lineage.
Example:
# Get all issues
issues = pipeline.get_all_issues()
print(f"Found {len(issues)} validation issues")
for issue in issues[:5]: # Show first 5
print(f" [{issue.severity.value}] {issue.message}")
get_issues
Get filtered validation issues by severity, category, or query.
pipeline.get_issues(
severity: Optional[str | IssueSeverity] = None,
category: Optional[str | IssueCategory] = None,
query_id: Optional[str] = None
) -> List[ValidationIssue]
Parameters:
- severity: Filter by severity ('error', 'warning', 'info' or IssueSeverity enum)
- category: Filter by category (string or IssueCategory enum)
- query_id: Filter by query ID
Example:
from clgraph.models import IssueSeverity, IssueCategory
# Get all errors
errors = pipeline.get_issues(severity='error')
print(f"Found {len(errors)} errors")
# Get all warnings using enum
warnings = pipeline.get_issues(severity=IssueSeverity.WARNING)
print(f"Found {len(warnings)} warnings")
# Get star-related issues
star_issues = pipeline.get_issues(
category=IssueCategory.UNQUALIFIED_STAR_MULTIPLE_TABLES
)
# Get issues from a specific query
query_issues = pipeline.get_issues(query_id='02_staging')
print(f"Found {len(query_issues)} issues in 02_staging query")
has_errors
Check if pipeline has any ERROR-level issues.
Example:
if pipeline.has_errors():
print("⚠️ Pipeline has validation errors!")
for error in pipeline.get_issues(severity='error'):
print(f" - {error.message}")
else:
print("✅ No validation errors")
has_warnings
Check if pipeline has any WARNING-level issues.
print_issues
Print all validation issues in a human-readable format.
Parameters:
- severity: Optional filter by severity
Example:
# Print all issues grouped by severity
pipeline.print_issues()
# Print only errors
pipeline.print_issues(severity='error')
Simplified Lineage
Get simplified views of the column lineage graph.
get_simplified_column_graph
Get a simplified version of the column lineage graph with only physical table columns.
Returns: A new PipelineLineageGraph with:
- Keeps: All physical table columns (raw., staging., analytics., etc.)
- Removes: CTE columns, subquery columns, star nodes
- Edges:* Direct table-to-table edges (traces through CTEs/subqueries)
Example:
# Get simplified graph
simplified = pipeline.get_simplified_column_graph()
# Compare sizes
print(f"Full graph: {len(pipeline.columns)} columns")
print(f"Simplified: {len(simplified.columns)} columns")
# Use simplified graph for cleaner visualization
for col in simplified.columns.values():
print(f" {col.full_name}")
This is useful for: - Data catalog exports - Show only physical tables - Impact analysis - Focus on table-to-table dependencies - Visualization - Cleaner graphs without internal structures