Skip to content

Export Classes

Export pipeline lineage data to various formats.

from clgraph import JSONExporter, CSVExporter

# The preamble provides a sample pipeline
print(f"Pipeline has {len(pipeline.columns)} columns to export")

JSONExporter

Export pipeline to JSON format.

Methods

export

Export to a dictionary.

JSONExporter.export(
    graph: Pipeline,
    include_metadata: bool = True,
    include_queries: bool = True
) -> Dict[str, Any]

Parameters: - include_metadata: Include column metadata (descriptions, PII, tags, etc.) - include_queries: Include SQL queries and dialect for round-trip serialization

Returns: Dictionary with: - columns: List of column data - edges: List of lineage edges - tables: List of table data - queries: List of SQL queries (when include_queries=True) - dialect: SQL dialect (when include_queries=True) - template_context: Template variables (when include_queries=True)

Example:

from clgraph import JSONExporter

# Basic export
data = JSONExporter.export(pipeline)
print(f"Exported {len(data['columns'])} columns")

# Export for round-trip (default)
data = JSONExporter.export(pipeline, include_queries=True)
print(f"Includes {len(data['queries'])} queries for round-trip")

export_to_file

Export directly to a JSON file.

JSONExporter.export_to_file(
    graph: Pipeline,
    file_path: str,
    include_metadata: bool = True,
    indent: int = 2
)

Example:

JSONExporter.export_to_file(pipeline, "lineage.json")

Output Format

{
  "dialect": "bigquery",
  "template_context": {"env": "staging"},
  "queries": [
    {
      "query_id": "query_0",
      "sql": "CREATE TABLE output AS SELECT SUM(amount) as total FROM raw"
    }
  ],
  "columns": [
    {
      "full_name": "output.total",
      "table_name": "output",
      "column_name": "total",
      "expression": "SUM(amount)",
      "operation": "aggregate",
      "node_type": "output",
      "description": "Total revenue",
      "owner": "analytics-team",
      "pii": false,
      "tags": ["financial"]
    }
  ],
  "edges": [
    {
      "from": "raw.amount",
      "to": "output.total",
      "edge_type": "aggregate",
      "transformation": "SUM",
      "query_id": "query_0"
    }
  ],
  "tables": [
    {
      "table_name": "output",
      "is_source": false,
      "created_by": "query_0"
    }
  ]
}

CSVExporter

Export column and table metadata to CSV files (one-way export only).

When to use CSVExporter

Use this for:

  • Opening metadata in Excel/Google Sheets for review
  • Sharing column inventory with non-technical stakeholders
  • Auditing PII flags and ownership across tables

Limitations

CSV export is a flat data dump - lineage relationships (edges) are not included.

  • For machine-readable export with full lineage → use JSONExporter
  • For round-trip serialization (save/reload pipelines) → use JSONExporter with include_queries=True and Pipeline.from_json()

Methods

export_columns_to_file

Export column metadata to CSV.

CSVExporter.export_columns_to_file(
    graph: Pipeline,
    file_path: str
)

Example:

CSVExporter.export_columns_to_file(pipeline, "columns.csv")

Output columns: - full_name - table_name - column_name - expression - operation - node_type - description - owner - pii - tags

export_tables_to_file

Export table metadata to CSV.

CSVExporter.export_tables_to_file(
    graph: Pipeline,
    file_path: str
)

Example:

CSVExporter.export_tables_to_file(pipeline, "tables.csv")

Visualization Functions

For visual export, use the visualization functions which return graphviz.Digraph objects.

See the Visualization API for full documentation.

Quick Example:

from clgraph import visualize_pipeline_lineage

# Create visualization using the preamble's pipeline
dot = visualize_pipeline_lineage(pipeline.column_graph)

# Save DOT source to file
with open("lineage.dot", "w") as f:
    f.write(dot.source)

# Or render directly (requires graphviz system package)
# dot.render("lineage", format="png", cleanup=True)
print(f"Visualization created: {len(dot.source)} chars")

Available Functions

Function Purpose
visualize_pipeline_lineage() Multi-query column lineage
visualize_column_lineage() Single-query column lineage
visualize_table_dependencies() Table-level DAG
visualize_table_dependencies_with_levels() Table DAG with execution levels
visualize_lineage_path() Traced lineage path
visualize_column_path() Path to specific column

Rendering

Convert DOT to image using GraphViz CLI:

# PNG
dot -Tpng lineage.dot -o lineage.png

# SVG
dot -Tsvg lineage.dot -o lineage.svg

# PDF
dot -Tpdf lineage.dot -o lineage.pdf

Convenience Method

Pipeline has a built-in JSON export:

# Equivalent to JSONExporter.export(pipeline)
data = pipeline.to_json()
print(f"Exported {len(data['columns'])} columns, {len(data['edges'])} edges")

Round-Trip Serialization

Save pipelines to JSON and reload them later for caching, sharing, or storing analyzed results.

Pipeline.from_json

Create a pipeline from JSON data.

Pipeline.from_json(
    data: Dict[str, Any],
    apply_metadata: bool = True
) -> Pipeline

Parameters: - data: JSON dictionary from JSONExporter.export() (must include queries) - apply_metadata: Whether to apply metadata (descriptions, PII, etc.) from the JSON

Example:

import json
from clgraph import Pipeline
from clgraph.export import JSONExporter

# Create and export pipeline
pipeline = Pipeline.from_tuples([
    ("staging", "CREATE TABLE staging.orders AS SELECT id, amount FROM raw.orders"),
], dialect="bigquery")

data = JSONExporter.export(pipeline, include_queries=True)

# Save to file
with open("pipeline.json", "w") as f:
    json.dump(data, f)

# Later, reload from JSON
with open("pipeline.json") as f:
    data = json.load(f)

restored = Pipeline.from_json(data)
print(f"Restored {len(restored.columns)} columns")

Pipeline.from_json_file

Convenience method to load directly from a file.

Pipeline.from_json_file(
    file_path: str,
    apply_metadata: bool = True
) -> Pipeline

Example:

from clgraph import Pipeline
from clgraph.export import JSONExporter

# Export to file
JSONExporter.export_to_file(pipeline, "pipeline.json", include_queries=True)

# Later, reload
restored = Pipeline.from_json_file("pipeline.json")

Metadata Preservation

When exporting with include_metadata=True (default) and importing with apply_metadata=True (default), all column metadata is preserved:

  • description: Column description
  • owner: Data owner
  • pii: PII flag
  • tags: Column tags
  • custom_metadata: Any custom metadata

Example:

# Set metadata before export
col = pipeline.get_column("raw.orders", "email")
col.description = "Customer email"
col.pii = True
col.owner = "privacy-team"

# Export and reimport
data = JSONExporter.export(pipeline, include_queries=True, include_metadata=True)
restored = Pipeline.from_json(data, apply_metadata=True)

# Metadata is preserved
restored_col = restored.get_column("raw.orders", "email")
assert restored_col.description == "Customer email"
assert restored_col.pii == True
assert restored_col.owner == "privacy-team"