Skip to content

Lineage Tools

Lineage Tools are the building blocks for interacting with pipeline lineage data. They provide a consistent interface used by the LineageAgent, MCP Server, and direct Python API.

Overview

Tools are organized into four categories:

Category Description Requires LLM
Lineage Trace column sources and impacts No
Schema Explore tables and columns No
Governance PII, ownership, tags No
SQL Generate/explain SQL Yes

Quick Start

from clgraph.tools import TraceBackwardTool, ListTablesTool

# Create tools with a pipeline
backward_tool = TraceBackwardTool(pipeline)
list_tool = ListTablesTool(pipeline)

# Run tools directly
result = backward_tool.run(table="analytics.revenue", column="total")
print(result.message)  # "Column analytics.revenue.total is derived from: ..."

result = list_tool.run()
print(f"Found {len(result.data['tables'])} tables")

ToolResult

All tools return a ToolResult object:

@dataclass
class ToolResult:
    success: bool       # Whether the tool executed successfully
    message: str        # Human-readable result message
    data: dict | None   # Structured data (tool-specific)

Lineage Tools

Tools for tracing column-level data flow.

TraceBackwardTool

Find where a column's data comes from (upstream lineage).

from clgraph.tools import TraceBackwardTool

tool = TraceBackwardTool(pipeline)
result = tool.run(table="analytics.customer_metrics", column="total_revenue")

print(result.message)
# "Column analytics.customer_metrics.total_revenue is derived from: raw.orders.amount"

print(result.data)
# {
#     "column": "analytics.customer_metrics.total_revenue",
#     "sources": ["raw.orders.amount"],
#     "path_count": 1
# }

Parameters:

Name Type Required Description
table str Yes Target table name
column str Yes Target column name

TraceForwardTool

Find what columns depend on a source column (downstream impact).

from clgraph.tools import TraceForwardTool

tool = TraceForwardTool(pipeline)
result = tool.run(table="raw.orders", column="amount")

print(result.message)
# "Column raw.orders.amount impacts: analytics.customer_metrics.total_revenue, ..."

print(result.data)
# {
#     "column": "raw.orders.amount",
#     "dependents": ["analytics.customer_metrics.total_revenue", ...],
#     "dependent_count": 5
# }

Parameters:

Name Type Required Description
table str Yes Source table name
column str Yes Source column name

GetLineagePathTool

Find the path between two columns.

from clgraph.tools import GetLineagePathTool

tool = GetLineagePathTool(pipeline)
result = tool.run(
    source_table="raw.orders",
    source_column="amount",
    target_table="analytics.revenue",
    target_column="total"
)

print(result.data)
# {
#     "source": "raw.orders.amount",
#     "target": "analytics.revenue.total",
#     "path": ["raw.orders.amount", "staging.orders.amount", "analytics.revenue.total"],
#     "path_length": 3
# }

Parameters:

Name Type Required Description
source_table str Yes Source table name
source_column str Yes Source column name
target_table str Yes Target table name
target_column str Yes Target column name

GetTableLineageTool

Get lineage summary for a table.

from clgraph.tools import GetTableLineageTool

tool = GetTableLineageTool(pipeline)
result = tool.run(table="analytics.customer_metrics")

print(result.data)
# {
#     "table": "analytics.customer_metrics",
#     "column_count": 10,
#     "upstream_tables": ["raw.orders", "raw.customers"],
#     "downstream_tables": ["marts.customer_360"]
# }

Parameters:

Name Type Required Description
table str Yes Table name to analyze

Schema Tools

Tools for exploring pipeline structure.

ListTablesTool

List all tables in the pipeline.

from clgraph.tools import ListTablesTool

tool = ListTablesTool(pipeline)
result = tool.run()

print(result.message)
# "Found 12 tables (3 source, 9 derived)"

for table in result.data["tables"]:
    print(f"  {table['name']}: {table['column_count']} columns")

Parameters: None

GetTableSchemaTool

Get detailed schema for a table.

from clgraph.tools import GetTableSchemaTool

tool = GetTableSchemaTool(pipeline)
result = tool.run(table="analytics.customer_metrics")

print(result.message)
# "Table analytics.customer_metrics has 12 columns, derived from staging.orders"

for col in result.data["columns"]:
    print(f"  {col['name']}: {col['description'] or 'No description'}")

Parameters:

Name Type Required Description
table str Yes Table name

SearchColumnsTool

Search for columns by name pattern.

from clgraph.tools import SearchColumnsTool

tool = SearchColumnsTool(pipeline)
result = tool.run(pattern="revenue")

print(result.message)
# "Found 6 columns matching 'revenue'"

for col in result.data["matches"]:
    print(f"  {col['full_name']}: {col['description']}")

Parameters:

Name Type Required Description
pattern str Yes Search pattern (case-insensitive)

GetRelationshipsTool

Get table relationships (joins, dependencies).

from clgraph.tools import GetRelationshipsTool

tool = GetRelationshipsTool(pipeline)
result = tool.run(table="analytics.customer_metrics")

print(result.data)
# {
#     "table": "analytics.customer_metrics",
#     "depends_on": ["staging.orders", "staging.customers"],
#     "used_by": ["marts.customer_360"]
# }

Parameters:

Name Type Required Description
table str Yes Table name

GetExecutionOrderTool

Get the topologically sorted execution order.

from clgraph.tools import GetExecutionOrderTool

tool = GetExecutionOrderTool(pipeline)
result = tool.run()

print(result.message)
# "Execution order: 12 queries in 4 levels"

for level in result.data["levels"]:
    print(f"Level {level['level']}: {level['queries']}")

Parameters: None


Governance Tools

Tools for data governance and compliance.

FindPIIColumnsTool

Find columns marked as PII.

from clgraph.tools import FindPIIColumnsTool

tool = FindPIIColumnsTool(pipeline)
result = tool.run()

# Or filter by table
result = tool.run(table="raw.customers")

print(result.message)
# "Found 5 PII columns"

for col in result.data["pii_columns"]:
    print(f"  {col['full_name']}: {col['tags']}")

Parameters:

Name Type Required Description
table str No Filter by table name

GetOwnersTool

Get ownership information.

from clgraph.tools import GetOwnersTool

tool = GetOwnersTool(pipeline)
result = tool.run(table="analytics.customer_metrics")

print(result.data)
# {
#     "table": "analytics.customer_metrics",
#     "owner": "analytics-team",
#     "columns_with_owners": [...]
# }

Parameters:

Name Type Required Description
table str Yes Table name

GetColumnsByTagTool

Find columns by tag.

from clgraph.tools import GetColumnsByTagTool

tool = GetColumnsByTagTool(pipeline)
result = tool.run(tag="sensitive")

print(result.message)
# "Found 8 columns with tag 'sensitive'"

Parameters:

Name Type Required Description
tag str Yes Tag to search for

ListTagsTool

List all tags in the pipeline.

from clgraph.tools import ListTagsTool

tool = ListTagsTool(pipeline)
result = tool.run()

print(result.data)
# {
#     "tags": ["pii", "sensitive", "financial", "metrics"],
#     "tag_count": 4
# }

Parameters: None

CheckDataQualityTool

Check for data quality issues.

from clgraph.tools import CheckDataQualityTool

tool = CheckDataQualityTool(pipeline)
result = tool.run()

print(result.data)
# {
#     "issues": [...],
#     "warnings": [...],
#     "info": [...]
# }

Parameters: None


SQL Tools (LLM Required)

Tools that require an LLM for natural language processing.

GenerateSQLTool

Generate SQL from natural language questions.

from clgraph.tools import GenerateSQLTool
from langchain_ollama import ChatOllama

llm = ChatOllama(model="llama3.2", temperature=0.3)
tool = GenerateSQLTool(pipeline, llm=llm)

result = tool.run(question="Show top 10 customers by lifetime value")

print(result.data["sql"])
# SELECT customer_id, lifetime_value
# FROM analytics.customer_metrics
# ORDER BY lifetime_value DESC
# LIMIT 10

Parameters:

Name Type Required Description
question str Yes Natural language question

ExplainQueryTool

Explain what a SQL query does.

from clgraph.tools import ExplainQueryTool
from langchain_ollama import ChatOllama

llm = ChatOllama(model="llama3.2", temperature=0.3)
tool = ExplainQueryTool(pipeline, llm=llm)

sql = """
SELECT user_email, SUM(amount) as total
FROM raw.orders
GROUP BY user_email
HAVING SUM(amount) > 1000
"""
result = tool.run(sql=sql)

print(result.message)
# "This query calculates total order amounts per customer,
#  filtering for customers who spent over $1000..."

Parameters:

Name Type Required Description
sql str Yes SQL query to explain

Tool Registry

For programmatic access to all tools, use ToolRegistry:

from clgraph.tools import create_tool_registry

# Create registry with all tools
registry = create_tool_registry(pipeline, llm=my_llm)

# List available tools
print(registry.list_tools())
# ['trace_backward', 'trace_forward', 'list_tables', ...]

# Run a tool by name
result = registry.run("trace_backward", table="X", column="Y")

# Get tool metadata
info = registry.get_tool_info("trace_backward")
print(info["description"])
print(info["parameters"])

Available Tool Names

Tool Name Class
trace_backward TraceBackwardTool
trace_forward TraceForwardTool
get_lineage_path GetLineagePathTool
get_table_lineage GetTableLineageTool
list_tables ListTablesTool
get_table_schema GetTableSchemaTool
get_relationships GetRelationshipsTool
search_columns SearchColumnsTool
get_execution_order GetExecutionOrderTool
find_pii_columns FindPIIColumnsTool
get_owners GetOwnersTool
get_columns_by_tag GetColumnsByTagTool
list_tags ListTagsTool
check_data_quality CheckDataQualityTool
generate_sql GenerateSQLTool
explain_query ExplainQueryTool

Creating Custom Tools

Extend BaseTool to create custom tools:

from clgraph.tools import BaseTool, ToolResult, ParameterSpec, ParameterType

class MyCustomTool(BaseTool):
    name = "my_custom_tool"
    description = "Does something custom"
    parameters = [
        ParameterSpec(
            name="input",
            type=ParameterType.STRING,
            description="Input value",
            required=True
        )
    ]

    def execute(self, **kwargs) -> ToolResult:
        input_val = kwargs.get("input")
        # Do something with self.pipeline and input_val
        return ToolResult(
            success=True,
            message=f"Processed: {input_val}",
            data={"result": "..."}
        )

For LLM-powered tools, extend LLMTool:

from clgraph.tools import LLMTool, ToolResult

class MyLLMTool(LLMTool):
    name = "my_llm_tool"
    description = "Uses LLM for something"

    def execute(self, **kwargs) -> ToolResult:
        if not self.llm:
            return ToolResult(
                success=False,
                message="LLM required for this tool"
            )

        response = self.llm.invoke("prompt here")
        return ToolResult(
            success=True,
            message=response.content,
            data={"response": response.content}
        )