Skip to content

Lineage Agent

The LineageAgent provides a natural language interface to pipeline lineage. Ask questions in plain English and get structured answers.

Overview

from clgraph.agent import LineageAgent

agent = LineageAgent(pipeline)

# Ask questions naturally
result = agent.query("Where does the revenue column come from?")
print(result.answer)  # "Column revenue is derived from: raw.orders.amount"

result = agent.query("What tables are available?")
print(result.answer)  # "Found 12 tables (3 source, 9 derived)"

Quick Start

Without LLM (Pattern Matching)

The agent works without an LLM by using pattern matching to route questions:

from clgraph.agent import LineageAgent

agent = LineageAgent(pipeline)

# Lineage questions
result = agent.query("Where does analytics.revenue.total come from?")
print(result.tool_used)  # "trace_backward"
print(result.answer)     # "Column is derived from: ..."

# Schema questions
result = agent.query("What columns does the orders table have?")
print(result.tool_used)  # "get_table_schema"

# Search questions
result = agent.query("Find columns containing 'customer'")
print(result.tool_used)  # "search_columns"

With LLM (Enhanced Capabilities)

Add an LLM for SQL generation and general questions:

from clgraph.agent import LineageAgent
from langchain_ollama import ChatOllama

llm = ChatOllama(model="llama3.2", temperature=0.3)
agent = LineageAgent(pipeline, llm=llm)

# SQL generation (requires LLM)
result = agent.query("Write SQL to get top customers by revenue")
print(result.data["sql"])

# Query explanation (requires LLM)
result = agent.query("Explain this query: SELECT ...")
print(result.answer)

Question Types

The agent automatically classifies questions and routes them to the appropriate tool:

Question Type Example Questions Tool Used
Lineage Backward "Where does X come from?", "Source of Y" trace_backward
Lineage Forward "What depends on X?", "Impact of Y" trace_forward
Schema Tables "What tables exist?", "List all tables" list_tables
Schema Columns "What columns does X have?", "Schema of Y" get_table_schema
Schema Search "Find columns named X", "Search for Y" search_columns
Governance PII "Which columns are PII?", "Sensitive data" find_pii_columns
Governance Owner "Who owns X?", "Ownership of Y" get_owners
SQL Generate "Write SQL to...", "Query to get..." generate_sql
SQL Explain "What does this query do?", "Explain SQL" explain_query

AgentResult

All queries return an AgentResult object:

@dataclass
class AgentResult:
    answer: str                    # Natural language answer
    question_type: QuestionType    # Detected question type
    tool_used: str | None          # Name of tool that was used
    tool_result: ToolResult | None # Raw tool result
    data: dict | None              # Structured data
    error: str | None              # Error message if failed

Accessing Results

result = agent.query("Where does revenue come from?")

# Human-readable answer
print(result.answer)
# "Column analytics.revenue.total is derived from: raw.orders.amount"

# What type of question was detected
print(result.question_type)
# QuestionType.LINEAGE_BACKWARD

# Which tool handled it
print(result.tool_used)
# "trace_backward"

# Structured data for programmatic use
print(result.data)
# {"column": "analytics.revenue.total", "sources": ["raw.orders.amount"]}

QuestionType Enum

from clgraph.agent import QuestionType

class QuestionType(Enum):
    LINEAGE_BACKWARD = "lineage_backward"   # Where does X come from?
    LINEAGE_FORWARD = "lineage_forward"     # What depends on X?
    LINEAGE_PATH = "lineage_path"           # How does X relate to Y?
    SCHEMA_TABLES = "schema_tables"         # What tables exist?
    SCHEMA_COLUMNS = "schema_columns"       # What columns does X have?
    SCHEMA_SEARCH = "schema_search"         # Find columns matching X
    GOVERNANCE_PII = "governance_pii"       # Which columns are PII?
    GOVERNANCE_OWNER = "governance_owner"   # Who owns X?
    SQL_GENERATE = "sql_generate"           # Write SQL to...
    SQL_EXPLAIN = "sql_explain"             # What does this query do?
    GENERAL = "general"                     # General question

Example Queries

Lineage Questions

# Backward tracing
result = agent.query("Where does analytics.customer_metrics.lifetime_value come from?")
print(result.answer)
# "Column analytics.customer_metrics.lifetime_value is derived from: raw.orders.total_amount"

# Forward impact
result = agent.query("What depends on raw.orders.customer_id?")
print(result.answer)
# "Column raw.orders.customer_id impacts: analytics.customer_metrics.customer_id, ..."

# Using natural variations
agent.query("Source of the revenue column")
agent.query("Trace back customer_id")
agent.query("What uses the email column?")
agent.query("If I change order_date, what's affected?")

Schema Questions

# List tables
result = agent.query("What tables are available?")
print(result.data["tables"])

# Table schema
result = agent.query("What columns does marts.customer_360 have?")
for col in result.data["columns"]:
    print(f"  {col['name']}: {col['description']}")

# Search
result = agent.query("Find columns containing 'revenue'")
print(f"Found {len(result.data['matches'])} matches")

Governance Questions

# PII detection
result = agent.query("Which columns contain PII?")
for col in result.data["pii_columns"]:
    print(f"  {col['full_name']}: {col['tags']}")

# Ownership
result = agent.query("Who owns the customer_360 table?")
print(result.data["owner"])

SQL Questions (Requires LLM)

# Generate SQL
result = agent.query("Write SQL to get top 10 customers by lifetime value")
print(result.data["sql"])
# SELECT customer_id, lifetime_value
# FROM analytics.customer_metrics
# ORDER BY lifetime_value DESC
# LIMIT 10

# Explain SQL
sql = "SELECT user_id, SUM(amount) FROM orders GROUP BY 1"
result = agent.query(f"What does this query do? {sql}")
print(result.answer)

Available Tools

The agent has access to 16 tools:

# Tool Name Description
1 trace_backward Trace column to ultimate sources
2 trace_forward Find downstream dependencies
3 get_lineage_path Find path between two columns
4 get_table_lineage Get lineage summary for a table
5 list_tables List all tables in pipeline
6 get_table_schema Get table columns and types
7 get_relationships Get table dependencies
8 search_columns Search columns by pattern
9 get_execution_order Get query execution order
10 find_pii_columns Find PII-flagged columns
11 get_owners Get ownership information
12 get_columns_by_tag Find columns by tag
13 list_tags List all tags
14 check_data_quality Check for quality issues
15 generate_sql Generate SQL from question (LLM)
16 explain_query Explain SQL query (LLM)

Using with LangGraph

For more complex agent workflows, combine with LangGraph:

from langgraph.graph import StateGraph
from clgraph.agent import LineageAgent

# Create lineage agent
lineage_agent = LineageAgent(pipeline, llm=llm)

# Define state
class State(TypedDict):
    question: str
    lineage_result: AgentResult
    final_answer: str

# Define nodes
def query_lineage(state: State) -> State:
    result = lineage_agent.query(state["question"])
    return {"lineage_result": result}

def format_answer(state: State) -> State:
    result = state["lineage_result"]
    return {"final_answer": f"Based on lineage: {result.answer}"}

# Build graph
workflow = StateGraph(State)
workflow.add_node("query", query_lineage)
workflow.add_node("format", format_answer)
workflow.add_edge("query", "format")
workflow.set_entry_point("query")

app = workflow.compile()

Error Handling

result = agent.query("Invalid question about nonexistent.column")

if result.error:
    print(f"Error: {result.error}")
else:
    print(result.answer)

# Check success via tool_result
if result.tool_result and not result.tool_result.success:
    print(f"Tool failed: {result.tool_result.message}")

Best Practices

  1. Be Specific: Include table and column names when asking about specific data
# Good
agent.query("Where does analytics.revenue.total come from?")

# Less specific
agent.query("Where does revenue come from?")
  1. Use Natural Language: The agent understands various phrasings
# All equivalent
agent.query("Source of customer_id")
agent.query("Where does customer_id come from?")
agent.query("What feeds into customer_id?")
agent.query("Trace back customer_id")
  1. Check Question Type: Verify the agent understood your intent
result = agent.query("...")
if result.question_type == QuestionType.GENERAL:
    # Agent didn't recognize the pattern - rephrase
    pass
  1. Use Structured Data: For programmatic use, access result.data instead of parsing result.answer
result = agent.query("What tables exist?")
tables = result.data["tables"]  # List of table dicts