Pipeline
The Pipeline class is the main entry point for clgraph. It provides methods to create pipelines from SQL, trace lineage, manage metadata, and execute queries.
# The preamble provides a sample pipeline with 3 tables:
# - raw.orders (source)
# - staging.orders (intermediate)
# - analytics.customer_metrics (final)
print(f"Pipeline has {len(pipeline.columns)} columns")
print(f"Tables: {list(pipeline.table_graph.tables.keys())}")
Factory Methods
Create pipelines from various SQL input formats.
from_sql_list
Create a pipeline from a list of SQL strings.
Pipeline.from_sql_list(
queries: List[str],
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Parameters:
- queries: List of SQL statements
- dialect: SQL dialect (bigquery, snowflake, postgres, etc.)
- template_context: Variables for Jinja2 template substitution (see Template Variables)
Example:
pipeline = Pipeline.from_sql_list([
"CREATE TABLE staging AS SELECT id, name FROM raw.users",
"CREATE TABLE output AS SELECT id, UPPER(name) as name FROM staging"
])
from_dict
Create a pipeline from a dictionary of query_id to SQL.
Pipeline.from_dict(
queries: Dict[str, str],
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Example:
pipeline = Pipeline.from_dict({
"staging_users": "CREATE TABLE staging AS SELECT * FROM raw.users",
"final_output": "CREATE TABLE output AS SELECT * FROM staging"
})
from_tuples
Create a pipeline from a list of (query_id, sql) tuples.
Pipeline.from_tuples(
queries: List[Tuple[str, str]],
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Example:
pipeline = Pipeline.from_tuples([
("staging", "CREATE TABLE staging AS SELECT * FROM raw"),
("output", "CREATE TABLE output AS SELECT * FROM staging")
])
from_sql_string
Create a pipeline from a semicolon-separated SQL string.
Pipeline.from_sql_string(
sql: str,
dialect: str = "bigquery",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Example:
sql = """
CREATE TABLE staging AS SELECT * FROM raw;
CREATE TABLE output AS SELECT * FROM staging;
"""
pipeline = Pipeline.from_sql_string(sql)
from_sql_files
Create a pipeline from SQL files in a directory.
Pipeline.from_sql_files(
sql_dir: str,
dialect: str = "bigquery",
pattern: str = "*.sql",
query_id_from: str = "filename",
template_context: Optional[Dict[str, Any]] = None
) -> Pipeline
Parameters:
- sql_dir: Directory containing SQL files
- dialect: SQL dialect (bigquery, snowflake, postgres, etc.)
- pattern: Glob pattern for matching SQL files (default: "*.sql")
- query_id_from: How to determine query IDs:
- "filename": Use filename without extension (default)
- "comment": Extract from first line comment -- query_id: name
- template_context: Variables for Jinja2 template substitution (see Template Variables)
Pattern Parameter:
The pattern parameter accepts glob patterns to filter SQL files:
"*.sql"- All.sqlfiles in the root directory (default)"**/*.sql"- All.sqlfiles in root and all subdirectories"staging/**/*.sql"- All.sqlfiles instaging/and its subdirectories"transform_*.sql"- Files starting withtransform_in root directory"[0-9]*.sql"- Files starting with a digit in root directory
Examples:
# Load all SQL files from root directory only
pipeline = Pipeline.from_sql_files("/path/to/sql/")
# Load all SQL files recursively from subdirectories
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="**/*.sql"
)
# Load only staging queries
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="staging/**/*.sql"
)
# Load queries with specific naming pattern
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="transform_*.sql"
)
# Use query IDs from comments instead of filenames
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
query_id_from="comment"
)
Lineage Methods
Trace data flow through your pipeline.
get_column
Get a column by table and column name.
pipeline.get_column(
table_name: str,
column_name: str,
query_id: Optional[str] = None
) -> Optional[ColumnNode]
Parameters:
- table_name: The table name
- column_name: The column name
- query_id: Optional query_id to filter by (useful when same table.column exists in multiple queries)
Example:
# Get a specific column
col = pipeline.get_column("raw.orders", "amount")
if col:
print(f"Found: {col.full_name}")
print(f"Expression: {col.expression}")
trace_column_backward
Find all source columns that contribute to a given column.
Example:
# Find source columns for total_amount in analytics
sources = pipeline.trace_column_backward("analytics.customer_metrics", "total_amount")
print(f"Sources for analytics.customer_metrics.total_amount:")
for source in sources:
print(f" - {source.full_name}")
trace_column_forward
Find all downstream columns impacted by a given column.
Example:
# Find what's impacted by a source column
impacts = pipeline.trace_column_forward("raw.orders", "amount")
print(f"Changing raw.orders.amount affects {len(impacts)} columns:")
for col in impacts:
print(f" - {col.full_name}")
get_lineage_path
Find the lineage path between two columns.
pipeline.get_lineage_path(
from_table: str,
from_column: str,
to_table: str,
to_column: str
) -> List[ColumnEdge]
Example:
# Find path from source to final column
path = pipeline.get_lineage_path(
"raw.orders", "amount",
"analytics.customer_metrics", "total_amount"
)
if path:
print("Lineage path:")
for edge in path:
print(f" {edge.from_node.full_name} -> {edge.to_node.full_name}")
trace_column_backward_full
Trace a column backward with full transparency, returning all nodes and edges.
pipeline.trace_column_backward_full(
table_name: str,
column_name: str,
include_ctes: bool = True
) -> Tuple[List[ColumnNode], List[ColumnEdge]]
Parameters:
- table_name: The table containing the column to trace
- column_name: The column name to trace
- include_ctes: If True, include CTE columns; if False, only real tables
Returns: Tuple of (nodes, edges) representing the complete lineage path.
Example:
# Get full lineage with all intermediate steps
nodes, edges = pipeline.trace_column_backward_full(
"analytics.customer_metrics", "total_amount"
)
print(f"Lineage includes {len(nodes)} columns and {len(edges)} edges")
for node in nodes:
print(f" {node.full_name}: {node.expression}")
get_table_lineage_path
Get simplified table-level lineage path for a column (skipping CTEs).
Returns: List of (table_name, column_name, query_id) tuples.
Example:
# Get table-level path (no CTEs)
path = pipeline.get_table_lineage_path("analytics.customer_metrics", "total_amount")
for table, column, query_id in path:
print(f" {table}.{column} (from {query_id})")
get_table_impact_path
Get simplified table-level impact path for a column (forward lineage, skipping CTEs).
Returns: List of (table_name, column_name, query_id) tuples.
Example:
# See what downstream tables are affected
path = pipeline.get_table_impact_path("raw.orders", "amount")
print("Downstream impact:")
for table, column, query_id in path:
print(f" {table}.{column}")
Metadata Methods
Manage column metadata including PII flags, ownership, and descriptions.
propagate_all_metadata
Propagate PII, owner, and tags through lineage edges.
Example:
# Mark source column as PII
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.customer_id"].owner = "security-team"
pipeline.columns["raw.orders.customer_id"].tags.add("sensitive")
# Propagate to all downstream columns
pipeline.propagate_all_metadata()
# Check propagation
downstream = pipeline.trace_column_forward("raw.orders", "customer_id")
for col in downstream:
print(f" {col.full_name}: pii={col.pii}")
generate_all_descriptions
Generate natural language descriptions using an LLM.
Requires LLM configuration:
from langchain_openai import ChatOpenAI
pipeline.llm = ChatOpenAI(model="gpt-4o-mini")
pipeline.generate_all_descriptions(verbose=True)
get_pii_columns
Get all columns marked as PII.
Example:
# Mark a column as PII first
pipeline.columns["raw.orders.customer_id"].pii = True
# Retrieve all PII columns
pii_columns = pipeline.get_pii_columns()
print(f"Found {len(pii_columns)} PII columns")
for col in pii_columns:
print(f" - {col.full_name}")
get_columns_by_owner
Get columns owned by a specific team/person.
get_columns_by_tag
Get columns with a specific tag.
Export Methods
Export pipeline data to various formats.
to_json
Export to a JSON-serializable dictionary.
Returns: Dictionary with columns, edges, and tables keys.
Example:
# Export to JSON dictionary
data = pipeline.to_json()
print(f"Exported {len(data['columns'])} columns")
print(f"Exported {len(data['edges'])} edges")
print(f"Exported {len(data['tables'])} tables")
to_airflow_dag
Generate an Airflow DAG from the pipeline.
pipeline.to_airflow_dag(
executor: Callable[[str], None],
dag_id: str,
schedule: str,
start_date: datetime,
default_args: Optional[dict] = None,
**dag_kwargs
) -> DAG
Parameters:
- executor: Function that takes SQL string and executes it
Example:
from datetime import datetime
def execute_sql(sql: str):
# Execute SQL in your data warehouse
client.query(sql)
dag = pipeline.to_airflow_dag(
executor=execute_sql,
dag_id="my_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False
)
to_dagster_assets
Generate Dagster assets from the pipeline.
pipeline.to_dagster_assets(
executor: Callable[[str], None],
group_name: Optional[str] = None,
key_prefix: Optional[Union[str, List[str]]] = None,
compute_kind: str = "sql",
**asset_kwargs
) -> List[Asset]
Parameters:
- executor: Function that takes SQL string and executes it
- group_name: Optional asset group name for organization in Dagster UI
- key_prefix: Optional prefix for asset keys (e.g., ["warehouse", "analytics"])
- compute_kind: Compute kind tag for assets (default: "sql")
- **asset_kwargs: Additional asset parameters (owners, tags, etc.)
Returns: List of Dagster Asset definitions
Note: Requires dagster>=1.5.0
Example:
from dagster import Definitions
def execute_sql(sql: str):
client.query(sql)
# Basic usage
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="analytics",
)
defs = Definitions(assets=assets)
# Advanced usage with prefixes and metadata
assets = pipeline.to_dagster_assets(
executor=execute_sql,
group_name="warehouse",
key_prefix=["prod", "analytics"],
compute_kind="clickhouse",
owners=["team:data-eng"],
tags={"domain": "finance"},
)
to_dagster_job
Generate a Dagster job from the pipeline using ops.
pipeline.to_dagster_job(
executor: Callable[[str], None],
job_name: str,
description: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
**job_kwargs
) -> Job
Parameters:
- executor: Function that takes SQL string and executes it
- job_name: Name for the Dagster job
- description: Optional job description (auto-generated if not provided)
- tags: Optional job tags for filtering in Dagster UI
- **job_kwargs: Additional job parameters
Returns: Dagster Job definition
Note: For new pipelines, consider using to_dagster_assets() instead, which provides better lineage tracking and observability.
Example:
from dagster import Definitions
def execute_sql(sql: str):
client.query(sql)
job = pipeline.to_dagster_job(
executor=execute_sql,
job_name="analytics_pipeline",
description="Daily analytics refresh",
tags={"env": "prod"},
)
# Execute locally
result = job.execute_in_process()
# Or add to Definitions
defs = Definitions(jobs=[job])
to_prefect_flow
Generate a Prefect flow from the pipeline.
pipeline.to_prefect_flow(
executor: Callable[[str], None],
flow_name: str,
description: Optional[str] = None,
retries: int = 2,
retry_delay_seconds: int = 60,
timeout_seconds: Optional[int] = None,
tags: Optional[List[str]] = None,
**flow_kwargs
) -> Flow
Parameters:
- executor: Function that takes SQL string and executes it
- flow_name: Name for the Prefect flow
- description: Optional flow description (auto-generated if not provided)
- retries: Number of retries for failed tasks (default: 2)
- retry_delay_seconds: Delay between retries in seconds (default: 60)
- timeout_seconds: Optional task timeout
- tags: Optional list of tags for the flow
- **flow_kwargs: Additional flow parameters
Returns: Prefect Flow definition
Note: Requires prefect>=3.0.0
Example:
from prefect import serve
def execute_sql(sql: str):
client.query(sql)
# Generate Prefect flow
flow = pipeline.to_prefect_flow(
executor=execute_sql,
flow_name="analytics_pipeline",
description="Daily analytics refresh",
retries=3,
tags=["analytics", "daily"],
)
# Run locally
flow()
# Or serve for execution
serve(flow)
to_prefect_deployment
Generate a Prefect deployment from the pipeline.
pipeline.to_prefect_deployment(
executor: Callable[[str], None],
flow_name: str,
deployment_name: str,
cron: Optional[str] = None,
interval: Optional[int] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
**deployment_kwargs
) -> Deployment
Parameters:
- executor: Function that takes SQL string and executes it
- flow_name: Name for the Prefect flow
- deployment_name: Name for the deployment
- cron: Optional cron schedule (e.g., "0 0 * * *" for daily)
- interval: Optional interval in seconds between runs
- description: Optional deployment description
- tags: Optional list of tags
- **deployment_kwargs: Additional deployment parameters
Returns: Prefect Deployment definition
Example:
def execute_sql(sql: str):
client.query(sql)
# Create a scheduled deployment
deployment = pipeline.to_prefect_deployment(
executor=execute_sql,
flow_name="analytics_pipeline",
deployment_name="daily_analytics",
cron="0 6 * * *", # Run at 6 AM daily
tags=["production", "analytics"],
)
# Deploy to Prefect server
deployment.apply()
to_kestra_flow
Generate a Kestra flow YAML from the pipeline.
pipeline.to_kestra_flow(
flow_id: str,
namespace: str,
description: Optional[str] = None,
connection_config: Optional[Dict[str, str]] = None,
cron: Optional[str] = None,
retry_attempts: int = 3,
labels: Optional[Dict[str, str]] = None,
**kwargs
) -> str
Parameters:
- flow_id: Unique identifier for the Kestra flow
- namespace: Kestra namespace (e.g., "clgraph.production")
- description: Optional flow description (auto-generated if not provided)
- connection_config: Database connection configuration dict with url, username, password
- cron: Optional cron schedule (e.g., "0 0 * * *" for daily)
- retry_attempts: Number of retry attempts for failed tasks (default: 3)
- labels: Optional key-value labels for the flow
- **kwargs: Additional flow-level configuration
Returns: YAML string representing the Kestra flow
Note: Kestra is a declarative orchestration platform using YAML-based workflow definitions.
Example:
# Generate Kestra flow YAML
yaml_content = pipeline.to_kestra_flow(
flow_id="analytics_pipeline",
namespace="clgraph.production",
description="Daily analytics refresh",
cron="0 6 * * *", # Run at 6 AM daily
connection_config={
"url": "jdbc:clickhouse://localhost:8123/default",
"username": "default",
"password": "",
},
labels={"env": "production", "team": "analytics"},
)
# Save to file
with open("flows/analytics_pipeline.yml", "w") as f:
f.write(yaml_content)
# Deploy via Kestra API
import requests
requests.post(
"http://localhost:8080/api/v1/flows",
headers={"Content-Type": "application/x-yaml"},
data=yaml_content,
)
The generated YAML flow uses io.kestra.plugin.jdbc.clickhouse.Query tasks ordered topologically to respect dependencies. Kestra executes tasks sequentially in the order they are defined.
Execution Methods
Execute pipeline queries directly.
run
Execute pipeline synchronously with parallel execution.
Parameters:
- executor: Function that takes SQL string and executes it
Example:
# Example executor function (mock)
executed_queries = []
def execute_sql(sql: str):
executed_queries.append(sql)
return {"status": "success"}
results = pipeline.run(executor=execute_sql, verbose=False)
print(f"Executed {len(executed_queries)} queries")
async_run
Execute pipeline asynchronously.
async pipeline.async_run(
executor: Callable[[str], Awaitable[Any]],
max_workers: int = 4,
verbose: bool = False
) -> dict
Parameters:
- executor: Async function that takes SQL string and executes it
Example:
import asyncio
async def execute_sql(sql: str):
# Mock async executor
return {"status": "success"}
results = asyncio.run(pipeline.async_run(executor=execute_sql))
print(f"Executed {len(results)} queries")
Pipeline Management
split
Split pipeline into independent subpipelines by sink tables.
Example:
# Split into subpipelines by final tables
final_tables = pipeline.table_graph.get_final_tables()
print(f"Final tables: {[t.table_name for t in final_tables]}")
# Create a subpipeline for one table
subpipelines = pipeline.split(
sinks=[[final_tables[0].table_name]]
)
print(f"Split into {len(subpipelines)} subpipeline(s)")
build_subpipeline
Build a subpipeline containing only queries needed to build a specific table.
Parameters:
- target_table: The table to build (e.g., "analytics.revenue")
Returns: A new Pipeline containing only the queries needed to build target_table.
Example:
# Build only what's needed for a specific table
subpipeline = pipeline.build_subpipeline("analytics.customer_metrics")
print(f"Full pipeline: {len(pipeline.table_graph.queries)} queries")
print(f"Subpipeline: {len(subpipeline.table_graph.queries)} queries")
diff
Compare this pipeline with another version.
Example:
old_pipeline = Pipeline.from_sql_files("/path/to/old/")
new_pipeline = Pipeline.from_sql_files("/path/to/new/")
diff = new_pipeline.diff(old_pipeline)
print(diff.summary())
Properties
columns
Dictionary of all columns keyed by full name.
Example:
# Show first 3 columns
for name, col in list(pipeline.columns.items())[:3]:
print(f"{name}: {col.expression or 'N/A'}")
edges
List of all lineage edges.
table_graph
Table-level dependency graph.
Example:
# Get execution order
execution_order = pipeline.table_graph.get_execution_order()
print(f"Execution order ({len(execution_order)} tables):")
for table in execution_order:
print(f" {table.table_name}")
dialect
SQL dialect used for parsing.
column_graph
Column-level lineage graph with all columns and edges.
Example:
# Get column graph
graph = pipeline.column_graph
# Find source and final columns
sources = graph.get_source_columns()
finals = graph.get_final_columns()
print(f"Source columns: {len(sources)}")
print(f"Final columns: {len(finals)}")
Validation Methods
Analyze SQL quality and identify potential issues in your pipeline.
get_all_issues
Get all validation issues from all queries in the pipeline.
Returns: Combined list of issues from individual query lineage graphs and pipeline-level lineage.
Example:
# Get all issues
issues = pipeline.get_all_issues()
print(f"Found {len(issues)} validation issues")
for issue in issues[:5]: # Show first 5
print(f" [{issue.severity.value}] {issue.message}")
get_issues
Get filtered validation issues by severity, category, or query.
pipeline.get_issues(
severity: Optional[str | IssueSeverity] = None,
category: Optional[str | IssueCategory] = None,
query_id: Optional[str] = None
) -> List[ValidationIssue]
Parameters:
- severity: Filter by severity ('error', 'warning', 'info' or IssueSeverity enum)
- category: Filter by category (string or IssueCategory enum)
- query_id: Filter by query ID
Example:
from clgraph.models import IssueSeverity, IssueCategory
# Get all errors
errors = pipeline.get_issues(severity='error')
print(f"Found {len(errors)} errors")
# Get all warnings using enum
warnings = pipeline.get_issues(severity=IssueSeverity.WARNING)
print(f"Found {len(warnings)} warnings")
# Get star-related issues
star_issues = pipeline.get_issues(
category=IssueCategory.UNQUALIFIED_STAR_MULTIPLE_TABLES
)
# Get issues from a specific query
query_issues = pipeline.get_issues(query_id='02_staging')
print(f"Found {len(query_issues)} issues in 02_staging query")
has_errors
Check if pipeline has any ERROR-level issues.
Example:
if pipeline.has_errors():
print("⚠️ Pipeline has validation errors!")
for error in pipeline.get_issues(severity='error'):
print(f" - {error.message}")
else:
print("✅ No validation errors")
has_warnings
Check if pipeline has any WARNING-level issues.
print_issues
Print all validation issues in a human-readable format.
Parameters:
- severity: Optional filter by severity
Example:
# Print all issues grouped by severity
pipeline.print_issues()
# Print only errors
pipeline.print_issues(severity='error')
Simplified Lineage
Get simplified views of the column lineage graph.
get_simplified_column_graph
Get a simplified version of the column lineage graph with only physical table columns.
Returns: A new PipelineLineageGraph with:
- Keeps: All physical table columns (raw., staging., analytics., etc.)
- Removes: CTE columns, subquery columns, star nodes
- Edges:* Direct table-to-table edges (traces through CTEs/subqueries)
Example:
# Get simplified graph
simplified = pipeline.get_simplified_column_graph()
# Compare sizes
print(f"Full graph: {len(pipeline.columns)} columns")
print(f"Simplified: {len(simplified.columns)} columns")
# Use simplified graph for cleaner visualization
for col in simplified.columns.values():
print(f" {col.full_name}")
This is useful for: - Data catalog exports - Show only physical tables - Impact analysis - Focus on table-to-table dependencies - Visualization - Cleaner graphs without internal structures