Lineage Classes
Classes for tracing and understanding data flow through SQL queries.
Pipeline-First Approach
Always start with the Pipeline class for lineage analysis. It provides unified access to both table-level and column-level lineage graphs.
# Access the lineage graphs from the preamble's pipeline
print(f"Tables: {list(table_graph.tables.keys())}")
column_graph = pipeline.column_graph
print(f"Columns: {len(column_graph.columns)}")
ColumnNode
Unified column representation for both single-query and multi-query lineage analysis.
Properties
| Property | Type | Description |
|---|---|---|
column_name |
str |
Column name |
table_name |
str |
Table name |
full_name |
str |
Fully qualified name (table.column) |
query_id |
Optional[str] |
Query that produces this column (for pipelines) |
unit_id |
Optional[str] |
CTE/subquery unit within a query |
node_type |
str |
"source", "intermediate", "output", "base_column", "star", etc. |
layer |
Optional[str] |
"input", "cte", "subquery", "output" (for compatibility) |
expression |
Optional[str] |
SQL expression |
operation |
Optional[str] |
Operation type (SUM, CASE, JOIN, etc.) |
description |
Optional[str] |
Column description |
owner |
Optional[str] |
Column owner |
pii |
bool |
Whether marked as PII |
tags |
Set[str] |
Custom tags |
custom_metadata |
Dict[str, Any] |
Custom key-value metadata |
Methods
set_source_description
Set a user-provided description.
is_computed
Check if this is a computed/derived column.
Example
# Access a column from the pipeline
col = pipeline.columns["analytics.customer_metrics.total_amount"]
# Set metadata
col.pii = False
col.owner = "analytics-team"
col.tags.add("financial")
col.set_source_description("Total revenue per customer")
# Check properties
print(f"Column: {col.full_name}")
print(f"Expression: {col.expression}")
print(f"Operation: {col.operation}")
print(f"Is computed: {col.is_computed()}")
Metadata Functions
For metadata propagation and description generation, use the utility functions:
from clgraph.column import propagate_metadata, generate_description
# Propagate metadata from source columns
propagate_metadata(col, pipeline)
# Generate description using LLM (requires LLM configuration)
generate_description(col, llm, pipeline)
ColumnEdge
Unified edge representing lineage relationships between columns.
Properties
| Property | Type | Description |
|---|---|---|
from_node |
ColumnNode |
Source column |
to_node |
ColumnNode |
Target column |
edge_type |
str |
Type: "direct", "transform", "aggregate", "join", "star_passthrough", "cross_query" |
context |
Optional[str] |
Context: "SELECT", "CTE", "main_query", "cross_query" |
transformation |
Optional[str] |
Description of transformation |
query_id |
Optional[str] |
Query where edge exists |
expression |
Optional[str] |
SQL expression |
Example
# Show first 3 edges
for edge in pipeline.edges[:3]:
print(f"{edge.from_node.full_name} -> {edge.to_node.full_name}")
print(f" Type: {edge.edge_type}")
print(f" Transformation: {edge.transformation}")
TableDependencyGraph
Table-level dependency graph for a pipeline.
Methods
get_execution_order
Get tables in topological order for execution.
Example:
# Get execution order
for table in table_graph.get_execution_order():
print(f"Execute: {table.table_name}")
get_source_tables
Get external source tables (not created by any query).
get_final_tables
Get final tables (not read by any query).
get_dependencies
Get upstream tables that a table depends on.
Example:
deps = table_graph.get_dependencies("analytics.customer_metrics")
print(f"Depends on: {[t.table_name for t in deps]}")
get_downstream
Get downstream tables that depend on this table.
Example:
downstream = table_graph.get_downstream("staging.orders")
print(f"Feeds into: {[t.table_name for t in downstream]}")
topological_sort
Get query IDs in execution order.
Properties
| Property | Type | Description |
|---|---|---|
tables |
Dict[str, TableNode] |
All tables by name |
queries |
Dict[str, ParsedQuery] |
All queries by ID |
TableNode
Represents a table in the dependency graph.
Properties
| Property | Type | Description |
|---|---|---|
table_name |
str |
Fully qualified table name |
is_source |
bool |
Whether external source |
created_by |
Optional[str] |
Query ID that creates this table |
modified_by |
List[str] |
Query IDs that modify this table |
read_by |
List[str] |
Query IDs that read this table |
columns |
Set[str] |
Column names in table |
description |
Optional[str] |
Table description |
Methods
get_columns
Get all columns for this table.
Example
# Access a specific table from the graph
table = table_graph.tables["analytics.customer_metrics"]
print(f"Table: {table.table_name}")
print(f"Is source: {table.is_source}")
print(f"Created by: {table.created_by}")
print(f"Read by: {table.read_by}")
PipelineLineageGraph
Column-level lineage graph for a multi-query pipeline. Access via pipeline.column_graph.
Properties
| Property | Type | Description |
|---|---|---|
columns |
Dict[str, ColumnNode] |
All columns by full name |
edges |
List[ColumnEdge] |
All lineage edges |
issues |
List[ValidationIssue] |
Validation issues |
Methods
get_source_columns
Get all source columns (columns with no incoming edges).
Example:
sources = pipeline.column_graph.get_source_columns()
print(f"Found {len(sources)} source columns")
for col in sources[:5]:
print(f" - {col.full_name}")
get_final_columns
Get all final columns (columns with no outgoing edges).
get_upstream
Get upstream columns that a column depends on (direct dependencies only).
Example:
upstream = pipeline.column_graph.get_upstream("analytics.customer_metrics.total_amount")
for col in upstream:
print(f" Depends on: {col.full_name}")
get_downstream
Get downstream columns that depend on this column (direct dependents only).
to_simplified
Create a simplified version of the graph with only physical table columns.
Returns: A new PipelineLineageGraph with:
- Only physical table columns (removes CTEs, subqueries, star nodes)
- Direct edges between table columns (traces through internal structures)
Example:
# Get simplified graph for cleaner visualization
simplified = pipeline.column_graph.to_simplified()
print(f"Full: {len(pipeline.column_graph.columns)} columns")
print(f"Simplified: {len(simplified.columns)} columns")
add_issue
Add a validation issue to the graph.
ValidationIssue
Represents a validation issue found during SQL parsing or lineage construction.
Properties
| Property | Type | Description |
|---|---|---|
severity |
IssueSeverity |
Error, warning, or info |
category |
IssueCategory |
Type of issue |
message |
str |
Human-readable description |
query_id |
Optional[str] |
Which query has the issue |
location |
Optional[str] |
Location in SQL (e.g., "SELECT clause") |
suggestion |
Optional[str] |
How to fix the issue |
context |
Dict[str, Any] |
Additional context |
Example
from clgraph.models import ValidationIssue, IssueSeverity, IssueCategory
# Issues are automatically collected during parsing
issues = pipeline.get_all_issues()
print(f"Found {len(issues)} issues")
for issue in issues[:3]: # Show first 3
print(f"[{issue.severity.value}] {issue.category.value}")
print(f" Query: {issue.query_id}")
print(f" Message: {issue.message}")
IssueSeverity
Severity level for validation issues.
| Value | Description |
|---|---|
IssueSeverity.ERROR |
Prevents lineage construction or causes incorrect results |
IssueSeverity.WARNING |
May cause issues or is a bad practice |
IssueSeverity.INFO |
Informational, suggestions for improvement |
IssueCategory
Category of validation issue.
Column Reference Issues:
| Category | Description |
|---|---|
AMBIGUOUS_COLUMN |
Column name is ambiguous (multiple tables) |
UNQUALIFIED_COLUMN |
Column lacks table qualifier in JOIN |
MISSING_COLUMN |
Referenced column doesn't exist |
Star Notation Issues:
| Category | Description |
|---|---|
UNQUALIFIED_STAR_MULTIPLE_TABLES |
SELECT * with multiple tables in FROM |
STAR_WITHOUT_SCHEMA |
Star expansion without schema information |
Table Reference Issues:
| Category | Description |
|---|---|
MISSING_TABLE |
Referenced table doesn't exist |
CIRCULAR_DEPENDENCY |
Circular table dependency detected |
AMBIGUOUS_TABLE |
Table name is ambiguous |
Expression Issues:
| Category | Description |
|---|---|
UNSUPPORTED_SYNTAX |
SQL syntax not supported by parser |
COMPLEX_EXPRESSION |
Expression too complex to trace |
PARSE_ERROR |
SQL parsing failed |
Schema Issues:
| Category | Description |
|---|---|
MISSING_SCHEMA_INFO |
Schema information not available |
SCHEMA_MISMATCH |
Schema doesn't match expected |
TYPE_MISMATCH |
Column type mismatch |
Best Practices:
| Category | Description |
|---|---|
IMPLICIT_JOIN |
Using comma-separated tables instead of JOIN |
SELECT_STAR_DISCOURAGED |
Using SELECT * instead of explicit columns |
Advanced: SQLColumnTracer
Advanced API
SQLColumnTracer is a low-level API for single-query lineage analysis. Most users should use the Pipeline class instead, which provides a unified interface for multi-query pipelines and includes all SQLColumnTracer functionality.
For single-query lineage analysis without the full pipeline context:
from clgraph import SQLColumnTracer
query = """
WITH cte AS (SELECT id, SUM(amount) as total FROM orders GROUP BY id)
SELECT id, total FROM cte
"""
tracer = SQLColumnTracer(query, dialect="bigquery")
Methods
get_forward_lineage
Impact analysis - find all outputs affected by input columns.
Example:
forward = tracer.get_forward_lineage(["orders.amount"])
print(forward["impacted_outputs"]) # ['total']
print(forward["impacted_ctes"]) # ['cte']
get_backward_lineage
Source tracing - find all inputs required for output columns.
Example: