Skip to content

Pipeline

The Pipeline class is the main entry point for clgraph. It provides methods to create pipelines from SQL, trace lineage, manage metadata, and execute queries.

# The preamble provides a sample pipeline with 3 tables:
# - raw.orders (source)
# - staging.orders (intermediate)
# - analytics.customer_metrics (final)

print(f"Pipeline has {len(pipeline.columns)} columns")
print(f"Tables: {list(pipeline.table_graph.tables.keys())}")

Factory Methods

Create pipelines from various SQL input formats.

from_sql_list

Create a pipeline from a list of SQL strings.

Pipeline.from_sql_list(
    queries: List[str],
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Parameters: - queries: List of SQL statements - dialect: SQL dialect (bigquery, snowflake, postgres, etc.) - template_context: Variables for Jinja2 template substitution (see Template Variables)

Example:

pipeline = Pipeline.from_sql_list([
    "CREATE TABLE staging AS SELECT id, name FROM raw.users",
    "CREATE TABLE output AS SELECT id, UPPER(name) as name FROM staging"
])

from_dict

Create a pipeline from a dictionary of query_id to SQL.

Pipeline.from_dict(
    queries: Dict[str, str],
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Example:

pipeline = Pipeline.from_dict({
    "staging_users": "CREATE TABLE staging AS SELECT * FROM raw.users",
    "final_output": "CREATE TABLE output AS SELECT * FROM staging"
})

from_tuples

Create a pipeline from a list of (query_id, sql) tuples.

Pipeline.from_tuples(
    queries: List[Tuple[str, str]],
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Example:

pipeline = Pipeline.from_tuples([
    ("staging", "CREATE TABLE staging AS SELECT * FROM raw"),
    ("output", "CREATE TABLE output AS SELECT * FROM staging")
])

from_sql_string

Create a pipeline from a semicolon-separated SQL string.

Pipeline.from_sql_string(
    sql: str,
    dialect: str = "bigquery",
    template_context: Optional[Dict[str, Any]] = None
) -> Pipeline

Example:

sql = """
CREATE TABLE staging AS SELECT * FROM raw;
CREATE TABLE output AS SELECT * FROM staging;
"""
pipeline = Pipeline.from_sql_string(sql)

from_sql_files

Create a pipeline from SQL files in a directory.

Pipeline.from_sql_files(
    sql_dir: str,
    dialect: str = "bigquery",
    pattern: str = "*.sql",
    query_id_from: str = "filename",
    template_context: Optional[Dict[str, Any]] = None,
    allow_symlinks: bool = False
) -> Pipeline

Parameters: - sql_dir: Directory containing SQL files - dialect: SQL dialect (bigquery, snowflake, postgres, etc.) - pattern: Glob pattern for matching SQL files (default: "*.sql") - query_id_from: How to determine query IDs: - "filename": Use filename without extension (default) - "comment": Extract from first line comment -- query_id: name - template_context: Variables for Jinja2 template substitution (see Template Variables) - allow_symlinks: Whether to follow symbolic links when loading files (default: False). Set to True only if you trust the directory contents, as symlinks could point to files outside the intended directory.

Pattern Parameter:

The pattern parameter accepts glob patterns to filter SQL files:

  • "*.sql" - All .sql files in the root directory (default)
  • "**/*.sql" - All .sql files in root and all subdirectories
  • "staging/**/*.sql" - All .sql files in staging/ and its subdirectories
  • "transform_*.sql" - Files starting with transform_ in root directory
  • "[0-9]*.sql" - Files starting with a digit in root directory

Examples:

# Load all SQL files from root directory only
pipeline = Pipeline.from_sql_files("/path/to/sql/")

# Load all SQL files recursively from subdirectories
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="**/*.sql"
)

# Load only staging queries
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="staging/**/*.sql"
)

# Load queries with specific naming pattern
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="transform_*.sql"
)

# Use query IDs from comments instead of filenames
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    query_id_from="comment"
)

Lineage Methods

Trace data flow through your pipeline.

get_column

Get a column by table and column name.

pipeline.get_column(
    table_name: str,
    column_name: str,
    query_id: Optional[str] = None
) -> Optional[ColumnNode]

Parameters: - table_name: The table name - column_name: The column name - query_id: Optional query_id to filter by (useful when same table.column exists in multiple queries)

Example:

# Get a specific column
col = pipeline.get_column("raw.orders", "amount")
if col:
    print(f"Found: {col.full_name}")
    print(f"Expression: {col.expression}")

trace_column_backward

Find all source columns that contribute to a given column.

pipeline.trace_column_backward(
    table_name: str,
    column_name: str
) -> List[ColumnNode]

Example:

# Find source columns for total_amount in analytics
sources = pipeline.trace_column_backward("analytics.customer_metrics", "total_amount")
print(f"Sources for analytics.customer_metrics.total_amount:")
for source in sources:
    print(f"  - {source.full_name}")

trace_column_forward

Find all downstream columns impacted by a given column.

pipeline.trace_column_forward(
    table_name: str,
    column_name: str
) -> List[ColumnNode]

Example:

# Find what's impacted by a source column
impacts = pipeline.trace_column_forward("raw.orders", "amount")
print(f"Changing raw.orders.amount affects {len(impacts)} columns:")
for col in impacts:
    print(f"  - {col.full_name}")

get_lineage_path

Find the lineage path between two columns.

pipeline.get_lineage_path(
    from_table: str,
    from_column: str,
    to_table: str,
    to_column: str
) -> List[ColumnEdge]

Example:

# Find path from source to final column
path = pipeline.get_lineage_path(
    "raw.orders", "amount",
    "analytics.customer_metrics", "total_amount"
)
if path:
    print("Lineage path:")
    for edge in path:
        print(f"  {edge.from_node.full_name} -> {edge.to_node.full_name}")

trace_column_backward_full

Trace a column backward with full transparency, returning all nodes and edges.

pipeline.trace_column_backward_full(
    table_name: str,
    column_name: str,
    include_ctes: bool = True
) -> Tuple[List[ColumnNode], List[ColumnEdge]]

Parameters: - table_name: The table containing the column to trace - column_name: The column name to trace - include_ctes: If True, include CTE columns; if False, only real tables

Returns: Tuple of (nodes, edges) representing the complete lineage path.

Example:

# Get full lineage with all intermediate steps
nodes, edges = pipeline.trace_column_backward_full(
    "analytics.customer_metrics", "total_amount"
)

print(f"Lineage includes {len(nodes)} columns and {len(edges)} edges")
for node in nodes:
    print(f"  {node.full_name}: {node.expression}")

get_table_lineage_path

Get simplified table-level lineage path for a column (skipping CTEs).

pipeline.get_table_lineage_path(
    table_name: str,
    column_name: str
) -> List[Tuple[str, str, str]]

Returns: List of (table_name, column_name, query_id) tuples.

Example:

# Get table-level path (no CTEs)
path = pipeline.get_table_lineage_path("analytics.customer_metrics", "total_amount")

for table, column, query_id in path:
    print(f"  {table}.{column} (from {query_id})")

get_table_impact_path

Get simplified table-level impact path for a column (forward lineage, skipping CTEs).

pipeline.get_table_impact_path(
    table_name: str,
    column_name: str
) -> List[Tuple[str, str, str]]

Returns: List of (table_name, column_name, query_id) tuples.

Example:

# See what downstream tables are affected
path = pipeline.get_table_impact_path("raw.orders", "amount")

print("Downstream impact:")
for table, column, query_id in path:
    print(f"  {table}.{column}")


Metadata Methods

Manage column metadata including PII flags, ownership, and descriptions.

propagate_all_metadata

Propagate PII, owner, and tags through lineage edges.

pipeline.propagate_all_metadata(verbose: bool = False)

Example:

# Mark source column as PII
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.customer_id"].owner = "security-team"
pipeline.columns["raw.orders.customer_id"].tags.add("sensitive")

# Propagate to all downstream columns
pipeline.propagate_all_metadata()

# Check propagation
downstream = pipeline.trace_column_forward("raw.orders", "customer_id")
for col in downstream:
    print(f"  {col.full_name}: pii={col.pii}")

generate_all_descriptions

Generate natural language descriptions using an LLM.

pipeline.generate_all_descriptions(
    batch_size: int = 10,
    verbose: bool = False
)

Requires LLM configuration:

from langchain_openai import ChatOpenAI

pipeline.llm = ChatOpenAI(model="gpt-4o-mini")
pipeline.generate_all_descriptions(verbose=True)

get_pii_columns

Get all columns marked as PII.

pipeline.get_pii_columns() -> List[ColumnNode]

Example:

# Mark a column as PII first
pipeline.columns["raw.orders.customer_id"].pii = True

# Retrieve all PII columns
pii_columns = pipeline.get_pii_columns()
print(f"Found {len(pii_columns)} PII columns")
for col in pii_columns:
    print(f"  - {col.full_name}")

get_columns_by_owner

Get columns owned by a specific team/person.

pipeline.get_columns_by_owner(owner: str) -> List[ColumnNode]

get_columns_by_tag

Get columns with a specific tag.

pipeline.get_columns_by_tag(tag: str) -> List[ColumnNode]

Export Methods

Export pipeline data to various formats.

to_json

Export to a JSON-serializable dictionary.

pipeline.to_json(include_metadata: bool = True) -> Dict[str, Any]

Returns: Dictionary with columns, edges, and tables keys.

Example:

# Export to JSON dictionary
data = pipeline.to_json()
print(f"Exported {len(data['columns'])} columns")
print(f"Exported {len(data['edges'])} edges")
print(f"Exported {len(data['tables'])} tables")

to_airflow_dag

Generate an Airflow DAG from the pipeline.

pipeline.to_airflow_dag(
    executor: Callable[[str], None],
    dag_id: str,
    schedule: str,
    start_date: datetime,
    default_args: Optional[dict] = None,
    **dag_kwargs
) -> DAG

Parameters: - executor: Function that takes SQL string and executes it

Example:

from datetime import datetime

def execute_sql(sql: str):
    # Execute SQL in your data warehouse
    client.query(sql)

dag = pipeline.to_airflow_dag(
    executor=execute_sql,
    dag_id="my_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False
)

to_dagster_assets

Generate Dagster assets from the pipeline.

pipeline.to_dagster_assets(
    executor: Callable[[str], None],
    group_name: Optional[str] = None,
    key_prefix: Optional[Union[str, List[str]]] = None,
    compute_kind: str = "sql",
    **asset_kwargs
) -> List[Asset]

Parameters: - executor: Function that takes SQL string and executes it - group_name: Optional asset group name for organization in Dagster UI - key_prefix: Optional prefix for asset keys (e.g., ["warehouse", "analytics"]) - compute_kind: Compute kind tag for assets (default: "sql") - **asset_kwargs: Additional asset parameters (owners, tags, etc.)

Returns: List of Dagster Asset definitions

Note: Requires dagster>=1.5.0

Example:

from dagster import Definitions

def execute_sql(sql: str):
    client.query(sql)

# Basic usage
assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="analytics",
)

defs = Definitions(assets=assets)

# Advanced usage with prefixes and metadata
assets = pipeline.to_dagster_assets(
    executor=execute_sql,
    group_name="warehouse",
    key_prefix=["prod", "analytics"],
    compute_kind="clickhouse",
    owners=["team:data-eng"],
    tags={"domain": "finance"},
)

to_dagster_job

Generate a Dagster job from the pipeline using ops.

pipeline.to_dagster_job(
    executor: Callable[[str], None],
    job_name: str,
    description: Optional[str] = None,
    tags: Optional[Dict[str, str]] = None,
    **job_kwargs
) -> Job

Parameters: - executor: Function that takes SQL string and executes it - job_name: Name for the Dagster job - description: Optional job description (auto-generated if not provided) - tags: Optional job tags for filtering in Dagster UI - **job_kwargs: Additional job parameters

Returns: Dagster Job definition

Note: For new pipelines, consider using to_dagster_assets() instead, which provides better lineage tracking and observability.

Example:

from dagster import Definitions

def execute_sql(sql: str):
    client.query(sql)

job = pipeline.to_dagster_job(
    executor=execute_sql,
    job_name="analytics_pipeline",
    description="Daily analytics refresh",
    tags={"env": "prod"},
)

# Execute locally
result = job.execute_in_process()

# Or add to Definitions
defs = Definitions(jobs=[job])

to_prefect_flow

Generate a Prefect flow from the pipeline.

pipeline.to_prefect_flow(
    executor: Callable[[str], None],
    flow_name: str,
    description: Optional[str] = None,
    retries: int = 2,
    retry_delay_seconds: int = 60,
    timeout_seconds: Optional[int] = None,
    tags: Optional[List[str]] = None,
    **flow_kwargs
) -> Flow

Parameters: - executor: Function that takes SQL string and executes it - flow_name: Name for the Prefect flow - description: Optional flow description (auto-generated if not provided) - retries: Number of retries for failed tasks (default: 2) - retry_delay_seconds: Delay between retries in seconds (default: 60) - timeout_seconds: Optional task timeout - tags: Optional list of tags for the flow - **flow_kwargs: Additional flow parameters

Returns: Prefect Flow definition

Note: Requires prefect>=3.0.0

Example:

from prefect import serve

def execute_sql(sql: str):
    client.query(sql)

# Generate Prefect flow
flow = pipeline.to_prefect_flow(
    executor=execute_sql,
    flow_name="analytics_pipeline",
    description="Daily analytics refresh",
    retries=3,
    tags=["analytics", "daily"],
)

# Run locally
flow()

# Or serve for execution
serve(flow)

to_prefect_deployment

Generate a Prefect deployment from the pipeline.

pipeline.to_prefect_deployment(
    executor: Callable[[str], None],
    flow_name: str,
    deployment_name: str,
    cron: Optional[str] = None,
    interval: Optional[int] = None,
    description: Optional[str] = None,
    tags: Optional[List[str]] = None,
    **deployment_kwargs
) -> Deployment

Parameters: - executor: Function that takes SQL string and executes it - flow_name: Name for the Prefect flow - deployment_name: Name for the deployment - cron: Optional cron schedule (e.g., "0 0 * * *" for daily) - interval: Optional interval in seconds between runs - description: Optional deployment description - tags: Optional list of tags - **deployment_kwargs: Additional deployment parameters

Returns: Prefect Deployment definition

Example:

def execute_sql(sql: str):
    client.query(sql)

# Create a scheduled deployment
deployment = pipeline.to_prefect_deployment(
    executor=execute_sql,
    flow_name="analytics_pipeline",
    deployment_name="daily_analytics",
    cron="0 6 * * *",  # Run at 6 AM daily
    tags=["production", "analytics"],
)

# Deploy to Prefect server
deployment.apply()

to_kestra_flow

Generate a Kestra flow YAML from the pipeline.

pipeline.to_kestra_flow(
    flow_id: str,
    namespace: str,
    description: Optional[str] = None,
    connection_config: Optional[Dict[str, str]] = None,
    cron: Optional[str] = None,
    retry_attempts: int = 3,
    labels: Optional[Dict[str, str]] = None,
    **kwargs
) -> str

Parameters: - flow_id: Unique identifier for the Kestra flow - namespace: Kestra namespace (e.g., "clgraph.production") - description: Optional flow description (auto-generated if not provided) - connection_config: Database connection configuration dict with url, username, password - cron: Optional cron schedule (e.g., "0 0 * * *" for daily) - retry_attempts: Number of retry attempts for failed tasks (default: 3) - labels: Optional key-value labels for the flow - **kwargs: Additional flow-level configuration

Returns: YAML string representing the Kestra flow

Note: Kestra is a declarative orchestration platform using YAML-based workflow definitions.

Example:

# Generate Kestra flow YAML
yaml_content = pipeline.to_kestra_flow(
    flow_id="analytics_pipeline",
    namespace="clgraph.production",
    description="Daily analytics refresh",
    cron="0 6 * * *",  # Run at 6 AM daily
    connection_config={
        "url": "jdbc:clickhouse://localhost:8123/default",
        "username": "default",
        "password": "",
    },
    labels={"env": "production", "team": "analytics"},
)

# Save to file
with open("flows/analytics_pipeline.yml", "w") as f:
    f.write(yaml_content)

# Deploy via Kestra API
import requests
requests.post(
    "http://localhost:8080/api/v1/flows",
    headers={"Content-Type": "application/x-yaml"},
    data=yaml_content,
)

The generated YAML flow uses io.kestra.plugin.jdbc.clickhouse.Query tasks ordered topologically to respect dependencies. Kestra executes tasks sequentially in the order they are defined.

to_mage_pipeline

Generate Mage pipeline files from the pipeline.

pipeline.to_mage_pipeline(
    executor: Callable[[str], None],
    pipeline_name: str,
    description: Optional[str] = None,
    connection_name: str = "default",
) -> Dict[str, Any]

Parameters: - executor: Function that takes SQL string and executes it (used to determine block behavior) - pipeline_name: Name for the Mage pipeline - description: Optional pipeline description (auto-generated if not provided) - connection_name: Mage io_config.yaml profile name (default: "default")

Returns: Dictionary with: - "metadata": YAML string content for metadata.yaml - "blocks": Dictionary mapping block names to Python code strings

Note: Mage uses a block-based architecture where source tables use @data_loader decorators and derived tables use @transformer decorators.

Example:

def execute_sql(sql: str):
    client.query(sql)

# Generate Mage pipeline files
files = pipeline.to_mage_pipeline(
    executor=execute_sql,
    pipeline_name="analytics_pipeline",
    description="Daily analytics refresh",
    connection_name="clickhouse_default",
)

# Access generated content
metadata_yaml = files["metadata"]
blocks = files["blocks"]

# Write to Mage project
import os

pipeline_dir = "/path/to/mage/pipelines/analytics_pipeline"
os.makedirs(f"{pipeline_dir}/blocks", exist_ok=True)

# Write metadata
with open(f"{pipeline_dir}/metadata.yaml", "w") as f:
    f.write(metadata_yaml)

# Write block files
for block_name, block_code in blocks.items():
    with open(f"{pipeline_dir}/blocks/{block_name}.py", "w") as f:
        f.write(block_code)

The generated pipeline uses mage_ai.io.clickhouse.ClickHouse for database connectivity and automatically wires block dependencies based on table lineage.


Execution Methods

Execute pipeline queries directly.

run

Execute pipeline synchronously with parallel execution.

pipeline.run(
    executor: Callable[[str], Any],
    max_workers: int = 4,
    verbose: bool = False
) -> dict

Parameters: - executor: Function that takes SQL string and executes it

Example:

# Example executor function (mock)
executed_queries = []

def execute_sql(sql: str):
    executed_queries.append(sql)
    return {"status": "success"}

results = pipeline.run(executor=execute_sql, verbose=False)
print(f"Executed {len(executed_queries)} queries")

async_run

Execute pipeline asynchronously.

async pipeline.async_run(
    executor: Callable[[str], Awaitable[Any]],
    max_workers: int = 4,
    verbose: bool = False
) -> dict

Parameters: - executor: Async function that takes SQL string and executes it

Example:

import asyncio

async def execute_sql(sql: str):
    # Mock async executor
    return {"status": "success"}

results = asyncio.run(pipeline.async_run(executor=execute_sql))
print(f"Executed {len(results)} queries")


Pipeline Management

split

Split pipeline into independent subpipelines by sink tables.

pipeline.split(sinks: List[List[str]]) -> List[Pipeline]

Example:

# Split into subpipelines by final tables
final_tables = pipeline.table_graph.get_final_tables()
print(f"Final tables: {[t.table_name for t in final_tables]}")

# Create a subpipeline for one table
subpipelines = pipeline.split(
    sinks=[[final_tables[0].table_name]]
)
print(f"Split into {len(subpipelines)} subpipeline(s)")

build_subpipeline

Build a subpipeline containing only queries needed to build a specific table.

pipeline.build_subpipeline(target_table: str) -> Pipeline

Parameters: - target_table: The table to build (e.g., "analytics.revenue")

Returns: A new Pipeline containing only the queries needed to build target_table.

Example:

# Build only what's needed for a specific table
subpipeline = pipeline.build_subpipeline("analytics.customer_metrics")

print(f"Full pipeline: {len(pipeline.table_graph.queries)} queries")
print(f"Subpipeline: {len(subpipeline.table_graph.queries)} queries")

diff

Compare this pipeline with another version.

pipeline.diff(other: Pipeline) -> PipelineDiff

Example:

old_pipeline = Pipeline.from_sql_files("/path/to/old/")
new_pipeline = Pipeline.from_sql_files("/path/to/new/")

diff = new_pipeline.diff(old_pipeline)
print(diff.summary())

Properties

columns

Dictionary of all columns keyed by full name.

pipeline.columns: Dict[str, ColumnNode]

Example:

# Show first 3 columns
for name, col in list(pipeline.columns.items())[:3]:
    print(f"{name}: {col.expression or 'N/A'}")

edges

List of all lineage edges.

pipeline.edges: List[ColumnEdge]

table_graph

Table-level dependency graph.

pipeline.table_graph: TableDependencyGraph

Example:

# Get execution order
execution_order = pipeline.table_graph.get_execution_order()
print(f"Execution order ({len(execution_order)} tables):")
for table in execution_order:
    print(f"  {table.table_name}")

dialect

SQL dialect used for parsing.

pipeline.dialect: str

column_graph

Column-level lineage graph with all columns and edges.

pipeline.column_graph: PipelineLineageGraph

Example:

# Get column graph
graph = pipeline.column_graph

# Find source and final columns
sources = graph.get_source_columns()
finals = graph.get_final_columns()

print(f"Source columns: {len(sources)}")
print(f"Final columns: {len(finals)}")


Validation Methods

Analyze SQL quality and identify potential issues in your pipeline.

get_all_issues

Get all validation issues from all queries in the pipeline.

pipeline.get_all_issues() -> List[ValidationIssue]

Returns: Combined list of issues from individual query lineage graphs and pipeline-level lineage.

Example:

# Get all issues
issues = pipeline.get_all_issues()

print(f"Found {len(issues)} validation issues")
for issue in issues[:5]:  # Show first 5
    print(f"  [{issue.severity.value}] {issue.message}")

get_issues

Get filtered validation issues by severity, category, or query.

pipeline.get_issues(
    severity: Optional[str | IssueSeverity] = None,
    category: Optional[str | IssueCategory] = None,
    query_id: Optional[str] = None
) -> List[ValidationIssue]

Parameters: - severity: Filter by severity ('error', 'warning', 'info' or IssueSeverity enum) - category: Filter by category (string or IssueCategory enum) - query_id: Filter by query ID

Example:

from clgraph.models import IssueSeverity, IssueCategory

# Get all errors
errors = pipeline.get_issues(severity='error')
print(f"Found {len(errors)} errors")

# Get all warnings using enum
warnings = pipeline.get_issues(severity=IssueSeverity.WARNING)
print(f"Found {len(warnings)} warnings")

# Get star-related issues
star_issues = pipeline.get_issues(
    category=IssueCategory.UNQUALIFIED_STAR_MULTIPLE_TABLES
)

# Get issues from a specific query
query_issues = pipeline.get_issues(query_id='02_staging')
print(f"Found {len(query_issues)} issues in 02_staging query")

has_errors

Check if pipeline has any ERROR-level issues.

pipeline.has_errors() -> bool

Example:

if pipeline.has_errors():
    print("⚠️  Pipeline has validation errors!")
    for error in pipeline.get_issues(severity='error'):
        print(f"  - {error.message}")
else:
    print("✅ No validation errors")

has_warnings

Check if pipeline has any WARNING-level issues.

pipeline.has_warnings() -> bool

Print all validation issues in a human-readable format.

pipeline.print_issues(severity: Optional[str | IssueSeverity] = None)

Parameters: - severity: Optional filter by severity

Example:

# Print all issues grouped by severity
pipeline.print_issues()

# Print only errors
pipeline.print_issues(severity='error')


Simplified Lineage

Get simplified views of the column lineage graph.

get_simplified_column_graph

Get a simplified version of the column lineage graph with only physical table columns.

pipeline.get_simplified_column_graph() -> PipelineLineageGraph

Returns: A new PipelineLineageGraph with: - Keeps: All physical table columns (raw., staging., analytics., etc.) - Removes: CTE columns, subquery columns, star nodes - Edges:* Direct table-to-table edges (traces through CTEs/subqueries)

Example:

# Get simplified graph
simplified = pipeline.get_simplified_column_graph()

# Compare sizes
print(f"Full graph: {len(pipeline.columns)} columns")
print(f"Simplified: {len(simplified.columns)} columns")

# Use simplified graph for cleaner visualization
for col in simplified.columns.values():
    print(f"  {col.full_name}")

This is useful for: - Data catalog exports - Show only physical tables - Impact analysis - Focus on table-to-table dependencies - Visualization - Cleaner graphs without internal structures