Skip to content

AI & LLM Integration

Use clgraph's AI capabilities to generate documentation, build text-to-SQL systems, and create conversational data agents.

clgraph provides first-class LLM integration with schema-grounded context, ensuring AI tools have accurate information about your data pipeline.


The Challenge

AI applications on data pipelines often fail due to:

  • LLM hallucinations: Models invent table and column names that don't exist
  • Missing context: AI can't understand data relationships without lineage
  • Manual documentation: Writing column descriptions is tedious and outdated quickly
  • Complex queries: Users need SQL but don't know the schema
  • Siloed AI tools: Each AI application requires custom integration

The Solution: Schema-Grounded AI

clgraph solves these problems by:

  1. Lineage context: AI sees how data flows through transformations
  2. Schema awareness: Tools automatically include real table/column names
  3. Metadata enrichment: PII flags, ownership, and descriptions inform AI responses
  4. Unified tools: Same tools work across agents, MCP, and direct API

Use Case 1: LLM-Powered Description Generation

Scenario: Auto-generate descriptions for columns that don't have them.

Setup with Ollama (Local, Free)

from clgraph import Pipeline
from langchain_ollama import ChatOllama

# Load your pipeline
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Connect to local Ollama
llm = ChatOllama(
    model="llama3:latest",  # or llama3.2, qwen3-coder:30b
    temperature=0.3,
)

pipeline.llm = llm

Setup with OpenAI

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4",
    temperature=0,
)

pipeline.llm = llm

Generate Descriptions

# Generate descriptions for all columns without them
pipeline.generate_all_descriptions()

# Or generate for specific columns
from clgraph.column import generate_description

for col in pipeline.columns.values():
    if not col.description and col.table_name.startswith("mart_"):
        generate_description(col, pipeline.llm, pipeline)
        print(f"{col.full_name}: {col.description}")

Why LLM-generated descriptions are good:

The LLM sees lineage context including:

  • Source columns and their metadata
  • Transformations (SUM, JOIN, CASE)
  • Filter conditions (WHERE clauses)
  • Aggregation logic (GROUP BY)

This produces descriptions that understand the data flow, not just the column name.


Use Case 2: Text-to-SQL with Schema Grounding

Scenario: Generate SQL from natural language without hallucinations.

The Problem with Traditional Text-to-SQL

LLMs often hallucinate table and column names:

User: "Show me total revenue by customer"
LLM: SELECT customer, SUM(revenue) FROM sales...

Problem: There's no "sales" table or "revenue" column!

The Solution: GenerateSQLTool

clgraph provides a GenerateSQLTool that automatically includes your real schema:

from clgraph import Pipeline
from clgraph.tools import GenerateSQLTool
from langchain_openai import ChatOpenAI

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
llm = ChatOpenAI(model="gpt-4", temperature=0)

# Create the SQL generation tool
sql_tool = GenerateSQLTool(pipeline, llm)

# Generate SQL from natural language
result = sql_tool.run(question="What is total revenue by customer?")

if result.success:
    print(result.data["sql"])
    print(result.data["explanation"])

Two-Stage Generation

For large schemas, use the two-stage strategy to select relevant tables first:

result = sql_tool.run(
    question="What is total revenue by customer?",
    strategy="two_stage",  # Select tables first, then generate
    include_explanation=True,
)

Manual Context Building

For custom integrations, build the context yourself:

import json

# Export the graph as context for the LLM
context = {
    "tables": list(pipeline.table_graph.tables.keys()),
    "columns_by_table": {},
}

for table_name in pipeline.table_graph.tables:
    context["columns_by_table"][table_name] = [
        col.column_name
        for col in pipeline.columns.values()
        if col.table_name == table_name
    ]

# The context can be included in an LLM prompt
print(f"Found {len(context['tables'])} tables")
print(f"Sample table columns: {list(context['columns_by_table'].keys())[:3]}")

Use Case 3: LineageAgent - Conversational Interface

Scenario: Ask questions about your data in natural language.

Basic Usage

from clgraph import Pipeline
from clgraph.agent import LineageAgent
from langchain_openai import ChatOpenAI

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
llm = ChatOpenAI(model="gpt-4", temperature=0)

# Create the agent
agent = LineageAgent(pipeline, llm=llm)

# Ask lineage questions
result = agent.query("Where does mart_customer_ltv.revenue come from?")
print(result.answer)

# Ask schema questions
result = agent.query("What tables are available?")
print(result.answer)

# Ask governance questions
result = agent.query("Which columns contain PII?")
print(result.answer)

# Generate SQL
result = agent.query("Write SQL to get monthly revenue by region")
print(result.data["sql"])

Question Types Supported

The LineageAgent automatically classifies and routes questions:

Question Type Example Tool Used
Lineage (backward) "Where does revenue come from?" trace_backward
Lineage (forward) "What depends on orders.amount?" trace_forward
Schema (tables) "What tables exist?" list_tables
Schema (columns) "What columns does orders have?" get_table_schema
Schema (search) "Find columns named 'revenue'" search_columns
Governance (PII) "Which columns are PII?" find_pii_columns
Governance (owners) "Who owns the orders table?" get_owners
SQL (generate) "Write SQL to..." generate_sql
SQL (explain) "Explain this query..." explain_query

Direct Tool Access

You can also run tools directly:

# Run a specific tool
result = agent.run_tool("trace_backward", table="mart_customer_ltv", column="revenue")

# List available tools
tools = agent.list_tools()
print(tools)

Use Case 4: SQL Explanation

Scenario: Understand what complex SQL queries do.

from clgraph.tools import ExplainQueryTool

explain_tool = ExplainQueryTool(pipeline, llm)

sql = """
SELECT
    c.customer_id,
    c.name,
    SUM(o.total_amount) as lifetime_value
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.status = 'completed'
GROUP BY c.customer_id, c.name
HAVING SUM(o.total_amount) > 1000
"""

result = explain_tool.run(sql=sql, detail_level="detailed")
print(result.data["explanation"])

Detail Levels:

  • brief: One-sentence summary
  • normal: 2-3 sentence explanation
  • detailed: Full breakdown of purpose, tables, joins, filters, and output

Use Case 5: MCP Server for External AI Agents

Scenario: Expose your pipeline to Claude, Cursor, or other MCP-compatible AI tools.

What is MCP?

The Model Context Protocol (MCP) is a standard for exposing tools and data to AI assistants. clgraph provides an MCP server that exposes lineage tools to any MCP-compatible client.

Running the MCP Server

# From command line
python -m clgraph.mcp --pipeline queries/ --dialect bigquery

Or programmatically:

from clgraph import Pipeline
from clgraph.mcp import run_mcp_server

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
run_mcp_server(pipeline)  # Blocks until terminated

Available MCP Tools

The MCP server exposes all lineage tools:

Tool Description
trace_backward Find source columns for a given column
trace_forward Find downstream impacts of a column
list_tables List all tables in the pipeline
get_table_schema Get columns for a table
search_columns Search for columns by name
find_pii_columns Find PII-flagged columns
get_owners Get ownership information
get_execution_order Get query execution order

MCP Resources

The server also exposes resources for direct data access:

  • pipeline://schema - Full pipeline schema
  • pipeline://tables - List of all tables
  • pipeline://tables/{name} - Specific table details

Claude Desktop Configuration

Add to your Claude Desktop config:

{
  "mcpServers": {
    "clgraph": {
      "command": "python",
      "args": ["-m", "clgraph.mcp", "--pipeline", "/path/to/queries", "--dialect", "bigquery"]
    }
  }
}

Use Case 6: Building Custom AI Tools

Scenario: Create your own AI-powered data tools using the clgraph tools framework.

Tool Registry

The tool registry provides a unified interface for all tools:

from clgraph.tools import create_tool_registry

# Create registry (LLM is optional - only needed for SQL tools)
registry = create_tool_registry(pipeline, llm=None)

# Run tools by name - basic tools work without LLM
result = registry.run("list_tables")
print(f"Found {len(result.data)} tables")  # result.data is a list of tables
print(f"Message: {result.message}")

# List available tools
print(f"Available tools: {registry.tool_names()[:5]}...")

Available Tool Categories

Lineage Tools (no LLM required):

  • TraceBackwardTool - Find source columns
  • TraceForwardTool - Find downstream impacts
  • GetLineagePathTool - Find path between columns
  • GetTableLineageTool - Get table-level dependencies

Schema Tools (no LLM required):

  • ListTablesTool - List all tables
  • GetTableSchemaTool - Get table columns
  • SearchColumnsTool - Search columns by pattern
  • GetRelationshipsTool - Get table relationships
  • GetExecutionOrderTool - Get topological execution order

Governance Tools (no LLM required):

  • FindPIIColumnsTool - Find PII columns
  • GetOwnersTool - Get ownership info
  • GetColumnsByTagTool - Filter by tags
  • ListTagsTool - List all tags
  • CheckDataQualityTool - Audit metadata quality

SQL Tools (require LLM):

  • GenerateSQLTool - Natural language to SQL
  • ExplainQueryTool - Explain SQL queries

Creating Custom Tools

Extend BaseTool or LLMTool:

from clgraph.tools import BaseTool, ParameterSpec, ParameterType, ToolResult

class MyCustomTool(BaseTool):
    name = "my_tool"
    description = "My custom lineage tool"

    @property
    def parameters(self):
        return {
            "table": ParameterSpec(
                name="table",
                type=ParameterType.STRING,
                description="Table to analyze",
                required=True,
            ),
        }

    def run(self, table: str) -> ToolResult:
        # Access pipeline via self.pipeline
        columns = list(self.pipeline.get_columns_by_table(table))
        return ToolResult.success_result(
            data={"column_count": len(columns)},
            message=f"Table {table} has {len(columns)} columns",
        )

# Test the custom tool
tool = MyCustomTool(pipeline)
print(f"Tool name: {tool.name}")
print(f"Tool description: {tool.description}")

Best Practices

1. Use Metadata for Better AI Results

The more metadata you provide, the better AI responses:

# Add descriptions before using AI
for col in pipeline.columns.values():
    if col.table_name == "raw_customers":
        col.description = "Customer data from CRM"
        col.owner = "data-team"

# Now AI has context for better responses
result = agent.query("Where does customer data come from?")

2. Propagate Metadata First

Always propagate metadata before AI operations:

# Ensure metadata flows through lineage
pipeline.propagate_all_metadata()

# Now PII tracking is complete for AI queries
result = agent.query("Which columns contain sensitive data?")

3. Use Two-Stage for Large Schemas

For pipelines with many tables, use two-stage SQL generation:

result = sql_tool.run(
    question="Get revenue trends",
    strategy="two_stage",  # Selects relevant tables first
)

4. Validate AI-Generated SQL

Always review generated SQL before execution:

result = sql_tool.run(question="Delete all old orders")

# Check the generated SQL
print(result.data["sql"])

# NEVER execute destructive SQL without review!

Key Benefits

Traditional Approach clgraph AI Integration
Manual documentation Auto-generated with lineage context
Hallucinated table names Schema-grounded SQL generation
Custom integrations per tool Unified tool framework
Siloed AI applications MCP server for any AI client
No lineage awareness AI understands data flow

Next Steps