Lineage Tools
Lineage Tools are the building blocks for interacting with pipeline lineage data. They provide a consistent interface used by the LineageAgent, MCP Server, and direct Python API.
Overview
Tools are organized into four categories:
| Category | Description | Requires LLM |
|---|---|---|
| Lineage | Trace column sources and impacts | No |
| Schema | Explore tables and columns | No |
| Governance | PII, ownership, tags | No |
| SQL | Generate/explain SQL | Yes |
Quick Start
from clgraph.tools import TraceBackwardTool, ListTablesTool
# Create tools with a pipeline
backward_tool = TraceBackwardTool(pipeline)
list_tool = ListTablesTool(pipeline)
# Run tools directly
result = backward_tool.run(table="analytics.revenue", column="total")
print(result.message) # "Column analytics.revenue.total is derived from: ..."
result = list_tool.run()
print(f"Found {len(result.data['tables'])} tables")
ToolResult
All tools return a ToolResult object:
@dataclass
class ToolResult:
success: bool # Whether the tool executed successfully
message: str # Human-readable result message
data: dict | None # Structured data (tool-specific)
Lineage Tools
Tools for tracing column-level data flow.
TraceBackwardTool
Find where a column's data comes from (upstream lineage).
from clgraph.tools import TraceBackwardTool
tool = TraceBackwardTool(pipeline)
result = tool.run(table="analytics.customer_metrics", column="total_revenue")
print(result.message)
# "Column analytics.customer_metrics.total_revenue is derived from: raw.orders.amount"
print(result.data)
# {
# "column": "analytics.customer_metrics.total_revenue",
# "sources": ["raw.orders.amount"],
# "path_count": 1
# }
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | Yes | Target table name |
column |
str | Yes | Target column name |
TraceForwardTool
Find what columns depend on a source column (downstream impact).
from clgraph.tools import TraceForwardTool
tool = TraceForwardTool(pipeline)
result = tool.run(table="raw.orders", column="amount")
print(result.message)
# "Column raw.orders.amount impacts: analytics.customer_metrics.total_revenue, ..."
print(result.data)
# {
# "column": "raw.orders.amount",
# "dependents": ["analytics.customer_metrics.total_revenue", ...],
# "dependent_count": 5
# }
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | Yes | Source table name |
column |
str | Yes | Source column name |
GetLineagePathTool
Find the path between two columns.
from clgraph.tools import GetLineagePathTool
tool = GetLineagePathTool(pipeline)
result = tool.run(
source_table="raw.orders",
source_column="amount",
target_table="analytics.revenue",
target_column="total"
)
print(result.data)
# {
# "source": "raw.orders.amount",
# "target": "analytics.revenue.total",
# "path": ["raw.orders.amount", "staging.orders.amount", "analytics.revenue.total"],
# "path_length": 3
# }
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
source_table |
str | Yes | Source table name |
source_column |
str | Yes | Source column name |
target_table |
str | Yes | Target table name |
target_column |
str | Yes | Target column name |
GetTableLineageTool
Get lineage summary for a table.
from clgraph.tools import GetTableLineageTool
tool = GetTableLineageTool(pipeline)
result = tool.run(table="analytics.customer_metrics")
print(result.data)
# {
# "table": "analytics.customer_metrics",
# "column_count": 10,
# "upstream_tables": ["raw.orders", "raw.customers"],
# "downstream_tables": ["marts.customer_360"]
# }
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | Yes | Table name to analyze |
Schema Tools
Tools for exploring pipeline structure.
ListTablesTool
List all tables in the pipeline.
from clgraph.tools import ListTablesTool
tool = ListTablesTool(pipeline)
result = tool.run()
print(result.message)
# "Found 12 tables (3 source, 9 derived)"
for table in result.data["tables"]:
print(f" {table['name']}: {table['column_count']} columns")
Parameters: None
GetTableSchemaTool
Get detailed schema for a table.
from clgraph.tools import GetTableSchemaTool
tool = GetTableSchemaTool(pipeline)
result = tool.run(table="analytics.customer_metrics")
print(result.message)
# "Table analytics.customer_metrics has 12 columns, derived from staging.orders"
for col in result.data["columns"]:
print(f" {col['name']}: {col['description'] or 'No description'}")
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | Yes | Table name |
SearchColumnsTool
Search for columns by name pattern.
from clgraph.tools import SearchColumnsTool
tool = SearchColumnsTool(pipeline)
result = tool.run(pattern="revenue")
print(result.message)
# "Found 6 columns matching 'revenue'"
for col in result.data["matches"]:
print(f" {col['full_name']}: {col['description']}")
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
pattern |
str | Yes | Search pattern (case-insensitive) |
GetRelationshipsTool
Get table relationships (joins, dependencies).
from clgraph.tools import GetRelationshipsTool
tool = GetRelationshipsTool(pipeline)
result = tool.run(table="analytics.customer_metrics")
print(result.data)
# {
# "table": "analytics.customer_metrics",
# "depends_on": ["staging.orders", "staging.customers"],
# "used_by": ["marts.customer_360"]
# }
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | Yes | Table name |
GetExecutionOrderTool
Get the topologically sorted execution order.
from clgraph.tools import GetExecutionOrderTool
tool = GetExecutionOrderTool(pipeline)
result = tool.run()
print(result.message)
# "Execution order: 12 queries in 4 levels"
for level in result.data["levels"]:
print(f"Level {level['level']}: {level['queries']}")
Parameters: None
Governance Tools
Tools for data governance and compliance.
FindPIIColumnsTool
Find columns marked as PII.
from clgraph.tools import FindPIIColumnsTool
tool = FindPIIColumnsTool(pipeline)
result = tool.run()
# Or filter by table
result = tool.run(table="raw.customers")
print(result.message)
# "Found 5 PII columns"
for col in result.data["pii_columns"]:
print(f" {col['full_name']}: {col['tags']}")
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | No | Filter by table name |
GetOwnersTool
Get ownership information.
from clgraph.tools import GetOwnersTool
tool = GetOwnersTool(pipeline)
result = tool.run(table="analytics.customer_metrics")
print(result.data)
# {
# "table": "analytics.customer_metrics",
# "owner": "analytics-team",
# "columns_with_owners": [...]
# }
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
table |
str | Yes | Table name |
GetColumnsByTagTool
Find columns by tag.
from clgraph.tools import GetColumnsByTagTool
tool = GetColumnsByTagTool(pipeline)
result = tool.run(tag="sensitive")
print(result.message)
# "Found 8 columns with tag 'sensitive'"
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
tag |
str | Yes | Tag to search for |
ListTagsTool
List all tags in the pipeline.
from clgraph.tools import ListTagsTool
tool = ListTagsTool(pipeline)
result = tool.run()
print(result.data)
# {
# "tags": ["pii", "sensitive", "financial", "metrics"],
# "tag_count": 4
# }
Parameters: None
CheckDataQualityTool
Check for data quality issues.
from clgraph.tools import CheckDataQualityTool
tool = CheckDataQualityTool(pipeline)
result = tool.run()
print(result.data)
# {
# "issues": [...],
# "warnings": [...],
# "info": [...]
# }
Parameters: None
SQL Tools (LLM Required)
Tools that require an LLM for natural language processing.
GenerateSQLTool
Generate SQL from natural language questions.
from clgraph.tools import GenerateSQLTool
from langchain_ollama import ChatOllama
llm = ChatOllama(model="llama3.2", temperature=0.3)
tool = GenerateSQLTool(pipeline, llm=llm)
result = tool.run(question="Show top 10 customers by lifetime value")
print(result.data["sql"])
# SELECT customer_id, lifetime_value
# FROM analytics.customer_metrics
# ORDER BY lifetime_value DESC
# LIMIT 10
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
question |
str | Yes | Natural language question |
ExplainQueryTool
Explain what a SQL query does.
from clgraph.tools import ExplainQueryTool
from langchain_ollama import ChatOllama
llm = ChatOllama(model="llama3.2", temperature=0.3)
tool = ExplainQueryTool(pipeline, llm=llm)
sql = """
SELECT user_email, SUM(amount) as total
FROM raw.orders
GROUP BY user_email
HAVING SUM(amount) > 1000
"""
result = tool.run(sql=sql)
print(result.message)
# "This query calculates total order amounts per customer,
# filtering for customers who spent over $1000..."
Parameters:
| Name | Type | Required | Description |
|---|---|---|---|
sql |
str | Yes | SQL query to explain |
Tool Registry
For programmatic access to all tools, use ToolRegistry:
from clgraph.tools import create_tool_registry
# Create registry with all tools
registry = create_tool_registry(pipeline, llm=my_llm)
# List available tools
print(registry.list_tools())
# ['trace_backward', 'trace_forward', 'list_tables', ...]
# Run a tool by name
result = registry.run("trace_backward", table="X", column="Y")
# Get tool metadata
info = registry.get_tool_info("trace_backward")
print(info["description"])
print(info["parameters"])
Available Tool Names
| Tool Name | Class |
|---|---|
trace_backward |
TraceBackwardTool |
trace_forward |
TraceForwardTool |
get_lineage_path |
GetLineagePathTool |
get_table_lineage |
GetTableLineageTool |
list_tables |
ListTablesTool |
get_table_schema |
GetTableSchemaTool |
get_relationships |
GetRelationshipsTool |
search_columns |
SearchColumnsTool |
get_execution_order |
GetExecutionOrderTool |
find_pii_columns |
FindPIIColumnsTool |
get_owners |
GetOwnersTool |
get_columns_by_tag |
GetColumnsByTagTool |
list_tags |
ListTagsTool |
check_data_quality |
CheckDataQualityTool |
generate_sql |
GenerateSQLTool |
explain_query |
ExplainQueryTool |
Creating Custom Tools
Extend BaseTool to create custom tools:
from clgraph.tools import BaseTool, ToolResult, ParameterSpec, ParameterType
class MyCustomTool(BaseTool):
name = "my_custom_tool"
description = "Does something custom"
parameters = [
ParameterSpec(
name="input",
type=ParameterType.STRING,
description="Input value",
required=True
)
]
def execute(self, **kwargs) -> ToolResult:
input_val = kwargs.get("input")
# Do something with self.pipeline and input_val
return ToolResult(
success=True,
message=f"Processed: {input_val}",
data={"result": "..."}
)
For LLM-powered tools, extend LLMTool:
from clgraph.tools import LLMTool, ToolResult
class MyLLMTool(LLMTool):
name = "my_llm_tool"
description = "Uses LLM for something"
def execute(self, **kwargs) -> ToolResult:
if not self.llm:
return ToolResult(
success=False,
message="LLM required for this tool"
)
response = self.llm.invoke("prompt here")
return ToolResult(
success=True,
message=response.content,
data={"response": response.content}
)