Beyond SQL Parsing: Extracting Lineage from Python ETL by Splitting the Job Across Three LLM Agents

Written by anshulpathak | Published 2026/01/08
Tech Story Tags: data-lineage | agentic-systems | multi-agent-ai-systems | sql-parsing | visibility-gap | modern-data-stacks | python-etl | multi-expert-collaboration

TLDRAs data stacks become increasingly heterogeneous, relying solely on static SQL parsing for data lineage is a failing strategy. via the TL;DR App

Visibility Gap in Modern Data Stacks

In my data platform there are pipelines I cannot trace beyond the SQL layer. Now when an analyst or data engineer asks from where a metric came, we spend hours analyzing Python jobs in Airflow and Spark. We need field-level lineage for personal data so we can provide details on access and deletion requests. Most lineage tools do fine when transformations stay in SQL because the AST is explicit.


But in modern data platforms, the ecosystem is not just pure SQL, especially when this break happens when code constructs paths dynamically, hides I/O behind helper functions, or writes via libraries that do not expose structured metadata. Static analyzers miss reads and writes when the path is built from env vars and config, so the lineage graph has missing edges.


In the catalog, the lineage stops at the Airflow task boundary. Everything inside the Python job shows up as 'unknown'

Multi-Expert Collaboration

During our analysis of using GenAI to extract lineage, we found that a single prompt with code context to find out lineage missed sources when I/O was wrapped in helper functions. So breaking all extraction tasks into roles reduced missed I/O calls.

Therefore, I split the whole process into three steps:

  1. Locate code - that touches external data
  2. Extract sources and sinks
  3. Format them into our lineage model, which is the OpenLineage format. You can run each step with a dedicated agents.


Approach/Vision

I designed a lineage extractor service integrated into Airflow Jobs. When a Python ETL script is modified or executed, it is passed through a chain of specialized LLM agents. In the prototype, we used LangChain to orchestrate prompts.

Agent Workflow

Agent 1: Code Contextualizer

Role: Preprocessing. It takes raw code and a Git repo with file details. It resolves imports, identifies configuration files referenced in the code, and chunks the code into logical functional blocks.

Goal: Reduce noise for the next agent.

Agent 2: I/O Extractor

Role: The core intelligence. It analyzes code chunks specifically looking for data I/O operations. It is prompted to act as a senior Python data engineer. We look for common I/O APIs, plus internal wrappers (for example, internal_utils.read_*). When paths are constructed dynamically, we capture the template and variables.

Goal: Identify raw source(s) and target(s).

Agent 3: Standardizer

Role: Compliance and formatting. The output from Agent 2 might be unstructured text. Agent 3 takes that output and formats the results into an OpenLineage RunEvent with inputs and outputs.

Goal: Produce machine-readable, standards-compliant lineage events.

Implementation Example (Python & LangChain)

Let's look at a piece of "problematic" Python code and how the agents handle it.

Code to Parse (Python/Pandas)

This is difficult for static parsers because the paths are constructed dynamically using f-strings and variables defined elsewhere.

# complex_etl_script.py
import pandas as pd
import os
from internal_utils import get_config

# Config is loaded externally - static parser can't see this
CONF = get_config(os.environ['ENV'])
REGION = "us-west-2"

def run_pipeline(execution_date):
    # Dynamic source path construction
    source_bucket = CONF['raw_bucket']
    input_path = f"s3://{source_bucket}/{REGION}/sales_data/{execution_date}/transaction_log.csv"
    print(f"Reading from {input_path}")
    df_raw = pd.read_csv(input_path)
    # Transformation
    df_filtered = df_raw[df_raw['amount'] > 0]
    # Dynamic target path construction
    target_bucket = CONF['processed_bucket']
    output_path = f"s3://{target_bucket}/finance/cleaned_transactions/"
    print(f"Writing to {output_path}")
    # Pandas write operation
    df_filtered.to_parquet(output_path, partition_cols=['date'])

# Pretend this is called by Airflow
# run_pipeline('2025-10-27')


The Multi-Agent Implementation Snippet

Below is a simplified LangChain implementation demonstrating the hand-off between the "Extractor" and the "Standardizer."

(Note: You would need appropriate API keys for an LLM provider like OpenAI or Anthropic to run this).


from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
import json

# Initialize LLM (using a high-reasoning model recommended)
llm_engineer = ChatOpenAI(model="gpt-4-turbo", temperature=0)
llm_governance = ChatOpenAI(model="gpt-5.0", temperature=0) # Cheaper model for formatting

# --- Agent 2: The Extractor (Engineer Persona) ---
extractor_prompt = ChatPromptTemplate.from_template(
    """
    You are a Senior Data Engineer specialized in Python and Pandas.
    Analyze the following Python code snippet. Your sole job is to identify data sources (reads) and data targets (writes).
    Rules:
    1. Look for Pandas I/O methods (read_csv, to_parquet, connect, etc.).
    2. Try to resolve f-strings or variables if context exists in the snippet. If not, report the variable name representing the path.
    3. IGNORE intermediate dataframes created during memory transformations. Focus only on Inputs and Outputs external to the script.
    Code Snippet:
    {code_snippet}
    Output format: A simple list identifying 'Source: [path/description]' and 'Target: [path/description]'.
    """
)


extractor_chain = extractor_prompt | llm_engineer | StrOutputParser()

# --- Agent 3: The Standardizer (Governance Persona) ---
standardizer_prompt = ChatPromptTemplate.from_template(
    """
    You are a Data Governance Officer responsible for metadata compliance.
    Take the unstructured extraction results below and format them into a strict JSON structure mimicking an OpenLineage dataset event.
    Assume the job namespace is 'apple.data.platform'.
    Format the names as URNs if they look like S3 paths.

    Extraction Results:
    {extraction_output}
    Required JSON Output Format:
    {{
      "inputs": [ {{"namespace": "...", "name": "..."}} ],
      "outputs": [ {{"namespace": "...", "name": "..."}} ]
    }}
    Return ONLY valid JSON.
    """
)

standardizer_chain = standardizer_prompt | llm_governance | StrOutputParser()

# --- The Full Pipeline ---
# We chain the output of the extractor into the input of the standardizer
full_lineage_chain = (
    {"extraction_output": extractor_chain} 
    | standardizer_chain
)

# --- Execution ---
# Load the messy code snippet defined above
with open("complex_etl_script.py", "r") as f:
    messy_code = f.read()

# Run the chain
result_json = full_lineage_chain.invoke({"code_snippet": messy_code})

print("--- Final Governance-Ready Lineage JSON ---")
print(json.dumps(json.loads(result_json), indent=2))


Output

Even though the Python script used complex f-strings for paths like f"s3://{source_bucket}/{REGION}/...", the "Extractor" agent is able to infer the semantic intent of the path structure, and the "Standardizer" agent formats it for our catalog.


---
Final Governance-Ready Lineage JSON ---
{
  "inputs": [
    {
      "namespace": "s3://raw_bucket",
      "name": "/us-west-2/sales_data/{execution_date}/transaction_log.csv"
    }
  ],
  "outputs": [
    {
      "namespace": "s3://processed_bucket",
      "name": "/finance/cleaned_transactions/"
    }
  ]
}


Benefits for Platform and Governance Teams

  1. Closing the Coverage Gap: We move from 60-70% lineage coverage (SQL only) to near 100% by successfully parsing Python/Pandas black boxes.
  2. Resilience over Fragility: Traditional regex parsers break the moment a developer changes indentation or switches from pd.read_csv to pandas.read_csv. The LLM approach understands the semantic intent of reading a file, making it far more resilient to code style variations.
  3. Standardization at Scale: By using a dedicated "standardizer" agent, we ensure that the output isn't just free text but structured data that can immediately be ingested by governance platforms like DataHub, Marquez, or internal compliance tools.




Conclusion

As data stacks become increasingly heterogeneous, relying solely on static SQL parsing for data lineage is a failing strategy. By adopting a multi-expert LLM approach, platform teams can provide governance teams with the complete, end-to-end visibility required to operate responsibly at enterprise scale. The move is from syntactic analysis to semantic understanding of our data pipelines.



Written by anshulpathak | Lead System Architect at Apple, Data Governance and Responsible AI Leader
Published by HackerNoon on 2026/01/08