Skip to content

Lineage Classes

Classes for tracing and understanding data flow through SQL queries.

Pipeline-First Approach

Always start with the Pipeline class for lineage analysis. It provides unified access to both table-level and column-level lineage graphs.

# Access the lineage graphs from the preamble's pipeline
print(f"Tables: {list(table_graph.tables.keys())}")

column_graph = pipeline.column_graph
print(f"Columns: {len(column_graph.columns)}")

ColumnNode

Unified column representation for both single-query and multi-query lineage analysis.

from clgraph import ColumnNode

Properties

Property Type Description
column_name str Column name
table_name str Table name
full_name str Fully qualified name (table.column)
query_id Optional[str] Query that produces this column (for pipelines)
unit_id Optional[str] CTE/subquery unit within a query
node_type str "source", "intermediate", "output", "base_column", "star", etc.
layer Optional[str] "input", "cte", "subquery", "output" (for compatibility)
expression Optional[str] SQL expression
operation Optional[str] Operation type (SUM, CASE, JOIN, etc.)
description Optional[str] Column description
owner Optional[str] Column owner
pii bool Whether marked as PII
tags Set[str] Custom tags
custom_metadata Dict[str, Any] Custom key-value metadata

Methods

set_source_description

Set a user-provided description.

col.set_source_description(description: str)

is_computed

Check if this is a computed/derived column.

col.is_computed() -> bool

Example

# Access a column from the pipeline
col = pipeline.columns["analytics.customer_metrics.total_amount"]

# Set metadata
col.pii = False
col.owner = "analytics-team"
col.tags.add("financial")
col.set_source_description("Total revenue per customer")

# Check properties
print(f"Column: {col.full_name}")
print(f"Expression: {col.expression}")
print(f"Operation: {col.operation}")
print(f"Is computed: {col.is_computed()}")

Metadata Functions

For metadata propagation and description generation, use the utility functions:

from clgraph.column import propagate_metadata, generate_description

# Propagate metadata from source columns
propagate_metadata(col, pipeline)

# Generate description using LLM (requires LLM configuration)
generate_description(col, llm, pipeline)

ColumnEdge

Unified edge representing lineage relationships between columns.

from clgraph import ColumnEdge

Properties

Property Type Description
from_node ColumnNode Source column
to_node ColumnNode Target column
edge_type str Type: "direct", "transform", "aggregate", "join", "star_passthrough", "cross_query"
context Optional[str] Context: "SELECT", "CTE", "main_query", "cross_query"
transformation Optional[str] Description of transformation
query_id Optional[str] Query where edge exists
expression Optional[str] SQL expression

Example

# Show first 3 edges
for edge in pipeline.edges[:3]:
    print(f"{edge.from_node.full_name} -> {edge.to_node.full_name}")
    print(f"  Type: {edge.edge_type}")
    print(f"  Transformation: {edge.transformation}")

TableDependencyGraph

Table-level dependency graph for a pipeline.

# table_graph is provided by the preamble
print(f"Tables in graph: {len(table_graph.tables)}")

Methods

get_execution_order

Get tables in topological order for execution.

graph.get_execution_order() -> List[TableNode]

Example:

# Get execution order
for table in table_graph.get_execution_order():
    print(f"Execute: {table.table_name}")

get_source_tables

Get external source tables (not created by any query).

graph.get_source_tables() -> List[TableNode]

get_final_tables

Get final tables (not read by any query).

graph.get_final_tables() -> List[TableNode]

get_dependencies

Get upstream tables that a table depends on.

graph.get_dependencies(table_name: str) -> List[TableNode]

Example:

deps = table_graph.get_dependencies("analytics.customer_metrics")
print(f"Depends on: {[t.table_name for t in deps]}")

get_downstream

Get downstream tables that depend on this table.

graph.get_downstream(table_name: str) -> List[TableNode]

Example:

downstream = table_graph.get_downstream("staging.orders")
print(f"Feeds into: {[t.table_name for t in downstream]}")

topological_sort

Get query IDs in execution order.

graph.topological_sort() -> List[str]

Properties

Property Type Description
tables Dict[str, TableNode] All tables by name
queries Dict[str, ParsedQuery] All queries by ID

TableNode

Represents a table in the dependency graph.

Properties

Property Type Description
table_name str Fully qualified table name
is_source bool Whether external source
created_by Optional[str] Query ID that creates this table
modified_by List[str] Query IDs that modify this table
read_by List[str] Query IDs that read this table
columns Set[str] Column names in table
description Optional[str] Table description

Methods

get_columns

Get all columns for this table.

table.get_columns(pipeline) -> List[ColumnNode]

Example

# Access a specific table from the graph
table = table_graph.tables["analytics.customer_metrics"]

print(f"Table: {table.table_name}")
print(f"Is source: {table.is_source}")
print(f"Created by: {table.created_by}")
print(f"Read by: {table.read_by}")

PipelineLineageGraph

Column-level lineage graph for a multi-query pipeline. Access via pipeline.column_graph.

graph = pipeline.column_graph

Properties

Property Type Description
columns Dict[str, ColumnNode] All columns by full name
edges List[ColumnEdge] All lineage edges
issues List[ValidationIssue] Validation issues

Methods

get_source_columns

Get all source columns (columns with no incoming edges).

graph.get_source_columns() -> List[ColumnNode]

Example:

sources = pipeline.column_graph.get_source_columns()
print(f"Found {len(sources)} source columns")
for col in sources[:5]:
    print(f"  - {col.full_name}")

get_final_columns

Get all final columns (columns with no outgoing edges).

graph.get_final_columns() -> List[ColumnNode]

get_upstream

Get upstream columns that a column depends on (direct dependencies only).

graph.get_upstream(full_name: str) -> List[ColumnNode]

Example:

upstream = pipeline.column_graph.get_upstream("analytics.customer_metrics.total_amount")
for col in upstream:
    print(f"  Depends on: {col.full_name}")

get_downstream

Get downstream columns that depend on this column (direct dependents only).

graph.get_downstream(full_name: str) -> List[ColumnNode]

to_simplified

Create a simplified version of the graph with only physical table columns.

graph.to_simplified() -> PipelineLineageGraph

Returns: A new PipelineLineageGraph with: - Only physical table columns (removes CTEs, subqueries, star nodes) - Direct edges between table columns (traces through internal structures)

Example:

# Get simplified graph for cleaner visualization
simplified = pipeline.column_graph.to_simplified()

print(f"Full: {len(pipeline.column_graph.columns)} columns")
print(f"Simplified: {len(simplified.columns)} columns")

add_issue

Add a validation issue to the graph.

graph.add_issue(issue: ValidationIssue)

ValidationIssue

Represents a validation issue found during SQL parsing or lineage construction.

from clgraph.models import ValidationIssue, IssueSeverity, IssueCategory

Properties

Property Type Description
severity IssueSeverity Error, warning, or info
category IssueCategory Type of issue
message str Human-readable description
query_id Optional[str] Which query has the issue
location Optional[str] Location in SQL (e.g., "SELECT clause")
suggestion Optional[str] How to fix the issue
context Dict[str, Any] Additional context

Example

from clgraph.models import ValidationIssue, IssueSeverity, IssueCategory

# Issues are automatically collected during parsing
issues = pipeline.get_all_issues()

print(f"Found {len(issues)} issues")
for issue in issues[:3]:  # Show first 3
    print(f"[{issue.severity.value}] {issue.category.value}")
    print(f"  Query: {issue.query_id}")
    print(f"  Message: {issue.message}")

IssueSeverity

Severity level for validation issues.

from clgraph.models import IssueSeverity
Value Description
IssueSeverity.ERROR Prevents lineage construction or causes incorrect results
IssueSeverity.WARNING May cause issues or is a bad practice
IssueSeverity.INFO Informational, suggestions for improvement

IssueCategory

Category of validation issue.

from clgraph.models import IssueCategory

Column Reference Issues:

Category Description
AMBIGUOUS_COLUMN Column name is ambiguous (multiple tables)
UNQUALIFIED_COLUMN Column lacks table qualifier in JOIN
MISSING_COLUMN Referenced column doesn't exist

Star Notation Issues:

Category Description
UNQUALIFIED_STAR_MULTIPLE_TABLES SELECT * with multiple tables in FROM
STAR_WITHOUT_SCHEMA Star expansion without schema information

Table Reference Issues:

Category Description
MISSING_TABLE Referenced table doesn't exist
CIRCULAR_DEPENDENCY Circular table dependency detected
AMBIGUOUS_TABLE Table name is ambiguous

Expression Issues:

Category Description
UNSUPPORTED_SYNTAX SQL syntax not supported by parser
COMPLEX_EXPRESSION Expression too complex to trace
PARSE_ERROR SQL parsing failed

Schema Issues:

Category Description
MISSING_SCHEMA_INFO Schema information not available
SCHEMA_MISMATCH Schema doesn't match expected
TYPE_MISMATCH Column type mismatch

Best Practices:

Category Description
IMPLICIT_JOIN Using comma-separated tables instead of JOIN
SELECT_STAR_DISCOURAGED Using SELECT * instead of explicit columns

Advanced: SQLColumnTracer

Advanced API

SQLColumnTracer is a low-level API for single-query lineage analysis. Most users should use the Pipeline class instead, which provides a unified interface for multi-query pipelines and includes all SQLColumnTracer functionality.

For single-query lineage analysis without the full pipeline context:

from clgraph import SQLColumnTracer

query = """
WITH cte AS (SELECT id, SUM(amount) as total FROM orders GROUP BY id)
SELECT id, total FROM cte
"""

tracer = SQLColumnTracer(query, dialect="bigquery")

Methods

get_forward_lineage

tracer.get_forward_lineage(input_columns: List[str]) -> Dict[str, Any]

Impact analysis - find all outputs affected by input columns.

Example:

forward = tracer.get_forward_lineage(["orders.amount"])
print(forward["impacted_outputs"])  # ['total']
print(forward["impacted_ctes"])     # ['cte']

get_backward_lineage

tracer.get_backward_lineage(output_columns: List[str]) -> Dict[str, Any]

Source tracing - find all inputs required for output columns.

Example:

backward = tracer.get_backward_lineage(["total"])
print(backward["required_inputs"])  # {'orders': ['amount']}
print(backward["required_ctes"])    # ['cte']