Skip to content

Diff Classes

Track changes between pipeline versions for impact analysis and documentation updates.


PipelineDiff

Compare two pipeline versions to identify changes.

Constructor

diff = pipeline.diff(other_pipeline)
# or
diff = PipelineDiff(new_graph=new_pipeline, old_graph=old_pipeline)

Methods

has_changes

Check if there are any differences.

diff.has_changes() -> bool

summary

Get a human-readable summary of changes.

diff.summary() -> str

Example:

# Create a modified version of the pipeline (added avg_order_value)
old_queries = [
    ("01_raw", "CREATE TABLE raw.orders AS SELECT order_id, customer_id, amount, status FROM external.orders"),
    ("02_staging", "CREATE TABLE staging.orders AS SELECT customer_id, SUM(amount) as total_amount FROM raw.orders WHERE status = 'completed' GROUP BY customer_id"),
]
old_pipeline = Pipeline.from_tuples(old_queries, dialect="bigquery")

# Compare with the preamble's pipeline (which has additional columns)
diff = pipeline.diff(old_pipeline)

if diff.has_changes():
    print(diff.summary())

get_sql_changes

Get only changes to SQL expressions.

diff.get_sql_changes() -> List[ColumnDiff]

get_lineage_changes

Get only changes to column lineage.

diff.get_lineage_changes() -> List[ColumnDiff]

get_columns_needing_update

Get columns that need description regeneration.

diff.get_columns_needing_update() -> List[str]

Example:

# Find columns that need new descriptions after SQL changes
columns_to_update = diff.get_columns_needing_update()

print(f"Columns needing update: {columns_to_update}")

Properties

Property Type Description
columns_added List[str] Columns in new but not old
columns_removed List[str] Columns in old but not new
columns_modified List[ColumnDiff] Columns with changes

Example:

print(f"Added: {len(diff.columns_added)} columns")
print(f"Removed: {len(diff.columns_removed)} columns")
print(f"Modified: {len(diff.columns_modified)} columns")

for change in diff.columns_modified[:3]:  # Show first 3
    print(f"  {change.full_name}: {change.field_name} changed")

ColumnDiff

Represents a single column change.

Properties

Property Type Description
column_name str Column name
table_name str Table name
full_name str Fully qualified name
field_name str Field that changed (expression, lineage, etc.)
old_value Any Previous value
new_value Any New value

Example

for change in diff.columns_modified[:2]:  # Show first 2
    print(f"Column: {change.full_name}")
    print(f"  Changed: {change.field_name}")
    print(f"  From: {change.old_value}")
    print(f"  To: {change.new_value}")

Use Cases

CI/CD Integration

def check_pipeline_changes():
    """Check for breaking changes in CI."""
    old = Pipeline.from_sql_files("/main/sql/")
    new = Pipeline.from_sql_files("/pr/sql/")

    diff = new.diff(old)

    if diff.columns_removed:
        print(f"WARNING: {len(diff.columns_removed)} columns removed")
        for col in diff.columns_removed:
            print(f"  - {col}")
        return False

    return True

Documentation Updates

def update_docs_after_changes():
    """Regenerate descriptions for changed columns."""
    old = Pipeline.from_sql_files("/prev/")
    new = Pipeline.from_sql_files("/current/")

    diff = new.diff(old)

    # Only regenerate for columns with actual changes
    for col_name in diff.get_columns_needing_update():
        col = new.columns[col_name]
        col.generate_description(llm, new)

    # Save pipeline with metadata to JSON
    from clgraph import JSONExporter
    JSONExporter.export_to_file(new, "metadata.json", include_queries=True)

Impact Analysis

def analyze_schema_change_impact():
    """Analyze impact before making schema changes."""
    current = Pipeline.from_sql_files("/current/")
    proposed = Pipeline.from_sql_files("/proposed/")

    diff = proposed.diff(current)

    # Check what's affected
    sql_changes = diff.get_sql_changes()
    lineage_changes = diff.get_lineage_changes()

    print(f"SQL expression changes: {len(sql_changes)}")
    print(f"Lineage changes: {len(lineage_changes)}")