Diff Classes
Track changes between pipeline versions for impact analysis and documentation updates.
PipelineDiff
Compare two pipeline versions to identify changes.
Constructor
diff = pipeline.diff(other_pipeline)
# or
diff = PipelineDiff(new_graph=new_pipeline, old_graph=old_pipeline)
Methods
has_changes
Check if there are any differences.
summary
Get a human-readable summary of changes.
Example:
# Create a modified version of the pipeline (added avg_order_value)
old_queries = [
("01_raw", "CREATE TABLE raw.orders AS SELECT order_id, customer_id, amount, status FROM external.orders"),
("02_staging", "CREATE TABLE staging.orders AS SELECT customer_id, SUM(amount) as total_amount FROM raw.orders WHERE status = 'completed' GROUP BY customer_id"),
]
old_pipeline = Pipeline.from_tuples(old_queries, dialect="bigquery")
# Compare with the preamble's pipeline (which has additional columns)
diff = pipeline.diff(old_pipeline)
if diff.has_changes():
print(diff.summary())
get_sql_changes
Get only changes to SQL expressions.
get_lineage_changes
Get only changes to column lineage.
get_columns_needing_update
Get columns that need description regeneration.
Example:
# Find columns that need new descriptions after SQL changes
columns_to_update = diff.get_columns_needing_update()
print(f"Columns needing update: {columns_to_update}")
Properties
| Property | Type | Description |
|---|---|---|
columns_added |
List[str] |
Columns in new but not old |
columns_removed |
List[str] |
Columns in old but not new |
columns_modified |
List[ColumnDiff] |
Columns with changes |
Example:
print(f"Added: {len(diff.columns_added)} columns")
print(f"Removed: {len(diff.columns_removed)} columns")
print(f"Modified: {len(diff.columns_modified)} columns")
for change in diff.columns_modified[:3]: # Show first 3
print(f" {change.full_name}: {change.field_name} changed")
ColumnDiff
Represents a single column change.
Properties
| Property | Type | Description |
|---|---|---|
column_name |
str |
Column name |
table_name |
str |
Table name |
full_name |
str |
Fully qualified name |
field_name |
str |
Field that changed (expression, lineage, etc.) |
old_value |
Any |
Previous value |
new_value |
Any |
New value |
Example
for change in diff.columns_modified[:2]: # Show first 2
print(f"Column: {change.full_name}")
print(f" Changed: {change.field_name}")
print(f" From: {change.old_value}")
print(f" To: {change.new_value}")
Use Cases
CI/CD Integration
def check_pipeline_changes():
"""Check for breaking changes in CI."""
old = Pipeline.from_sql_files("/main/sql/")
new = Pipeline.from_sql_files("/pr/sql/")
diff = new.diff(old)
if diff.columns_removed:
print(f"WARNING: {len(diff.columns_removed)} columns removed")
for col in diff.columns_removed:
print(f" - {col}")
return False
return True
Documentation Updates
def update_docs_after_changes():
"""Regenerate descriptions for changed columns."""
old = Pipeline.from_sql_files("/prev/")
new = Pipeline.from_sql_files("/current/")
diff = new.diff(old)
# Only regenerate for columns with actual changes
for col_name in diff.get_columns_needing_update():
col = new.columns[col_name]
col.generate_description(llm, new)
# Save pipeline with metadata to JSON
from clgraph import JSONExporter
JSONExporter.export_to_file(new, "metadata.json", include_queries=True)
Impact Analysis
def analyze_schema_change_impact():
"""Analyze impact before making schema changes."""
current = Pipeline.from_sql_files("/current/")
proposed = Pipeline.from_sql_files("/proposed/")
diff = proposed.diff(current)
# Check what's affected
sql_changes = diff.get_sql_changes()
lineage_changes = diff.get_lineage_changes()
print(f"SQL expression changes: {len(sql_changes)}")
print(f"Lineage changes: {len(lineage_changes)}")