Automating Data Lineage Tracking from Code to Lake with ESLint, Neo4j, and Apache Hudi


The incident that finally broke our manual processes began with a one-line change to a seemingly innocuous data transformation script. A business analyst updated a currency conversion function. Three days later, our core financial reporting dashboard was producing nonsensical figures. The subsequent investigation consumed nearly 40 engineering hours, involving a frantic trace through hundreds of interdependent scripts, undocumented dependencies, and ambiguous commit messages. The root cause was a subtle floating-point precision change that cascaded through five downstream Apache Hudi tables. We had no automated way to determine the blast radius of a code change. This was our technical breaking point.

Our initial concept was to bridge the gap between the static world of our data transformation codebase and the dynamic world of our data lake. We needed a system that could answer two critical questions instantly:

  1. Given a specific line of code, what datasets will it ultimately affect?
  2. Given a specific dataset, what code is responsible for its current state?

The answer, we hypothesized, lay in treating our entire data ecosystem as a single, interconnected graph.

Technology Selection Rationale

In a real-world project, technology choices are driven by constraints and leverage. We weren’t starting from a blank slate.

  • Apache Hudi: Our data lake was already built on Hudi. Its key feature for this project was not just its transactional capabilities but its detailed commit timeline metadata stored in the .hoodie directory. Each commit or deltacommit contains information about the files changed and a slot for custom metadata. This was our hook into the data layer. We could stamp our data pipeline commits with the Git hash of the code that ran them.

  • ESLint: This choice was unconventional. Most teams see ESLint as a linter for code style and simple errors. We saw its core engine: an Abstract Syntax Tree (AST) parser. Our transformation logic was primarily written in TypeScript. Instead of writing a complex, brittle parser from scratch, we could build a custom ESLint plugin. This plugin wouldn’t enforce style; it would traverse the AST of each transformation file to extract data lineage metadata, such as source tables and target tables. It was already integrated into our CI, providing a natural entry point for metadata extraction.

  • Neo4j: Data lineage is fundamentally a graph problem. Nodes represent entities like code files, functions, and data tables. Relationships represent actions like READS_FROM, WRITES_to, and DEPENDS_ON. Attempting to model this in a relational database would result in a nightmare of recursive queries and JOINs. Neo4j was built for this. Its Cypher query language is expressive for traversing variable-length paths, making impact analysis queries trivial.

Phase 1: Building the ESLint Metadata Extractor

The first challenge was to programmatically understand our code. Our convention was to use wrapper functions like spark.read.hudi('s3://bucket/path/to/table') and dataframe.write.hudi('s3://bucket/path/to/target'). Our ESLint rule needed to identify these calls and extract their arguments.

Here is the core structure of our custom ESLint plugin. This is not a simple rule; it’s a dedicated metadata extractor.

File: ./eslint-rules/extract-lineage.js

/**
 * @fileoverview Extracts Hudi read/write operations for data lineage.
 * This is not a linter rule in the traditional sense. Its primary purpose
 * is to parse the AST and output structured metadata via a custom reporter.
 */
'use strict';

// A map to store the lineage data found per file.
// In a real implementation, this would write to a file or stream to a service.
const lineageData = new Map();

module.exports = {
  meta: {
    type: 'problem',
    docs: {
      description: 'Extracts data lineage from Hudi read/write calls',
      category: 'Data Engineering',
      recommended: false,
    },
    fixable: null,
    schema: [],
  },

  create: function (context) {
    const filename = context.getFilename();
    if (!lineageData.has(filename)) {
      lineageData.set(filename, { sources: new Set(), sinks: new Set() });
    }

    // This is the core logic: the AST visitor.
    // We are looking for specific call patterns.
    return {
      // Visitor for nodes of type 'CallExpression', e.g., myFunction(arg1)
      CallExpression(node) {
        const callee = node.callee;

        // Pattern 1: Look for `...write.hudi(...)`
        if (
          callee.type === 'MemberExpression' &&
          callee.property.type === 'Identifier' &&
          callee.property.name === 'hudi' &&
          callee.object.type === 'MemberExpression' &&
          callee.object.property.type === 'Identifier' &&
          callee.object.property.name === 'write'
        ) {
          if (node.arguments.length > 0 && node.arguments[0].type === 'Literal') {
            const tablePath = node.arguments[0].value;
            lineageData.get(filename).sinks.add(tablePath);
            // We could report a "finding" here, but we are using this silently.
            // context.report({ node, message: `Found Hudi sink: ${tablePath}` });
          }
        }

        // Pattern 2: Look for `...read.hudi(...)`
        if (
          callee.type === 'MemberExpression' &&
          callee.property.type === 'Identifier' &&
          callee.property.name === 'hudi' &&
          callee.object.type === 'MemberExpression' &&
          callee.object.property.type === 'Identifier' &&
          callee.object.property.name === 'read'
        ) {
          if (node.arguments.length > 0 && node.arguments[0].type === 'Literal') {
            const tablePath = node.arguments[0].value;
            lineageData.get(filename).sources.add(tablePath);
          }
        }
      },

      // The 'Program:exit' event fires after the AST traversal for a file is complete.
      // This is the perfect place to process the collected data.
      'Program:exit': function () {
        // In a real CI/CD pipeline, this is where we'd serialize and output the data.
        // For this example, we log it. A better approach is a custom reporter.
        const fileLineage = lineageData.get(filename);
        if (fileLineage && (fileLineage.sources.size > 0 || fileLineage.sinks.size > 0)) {
           const output = {
              file: filename,
              sources: Array.from(fileLineage.sources),
              sinks: Array.from(fileLineage.sinks),
           };
           // This is a hack for demonstration. Production use requires a custom reporter
           // to write this to a file properly without polluting stdout.
           console.log(`LINEAGE_METADATA:${JSON.stringify(output)}`);
        }
      }
    };
  },
  // Export the lineage data for potential testing or external use
  _getLineageData: () => lineageData,
};

To run this, we configured our .eslintrc.js to include our local rule and then executed ESLint over our codebase, redirecting the output.

Execution Script: extract_lineage.sh

#!/bin/bash

# Ensure we start with a clean state
rm -f lineage_output.json

# Run ESLint, targeting our data transformation directories.
# We use a special grep pattern to capture only our custom log output.
# The --no-color flag is important for clean parsing.
# The --format=compact is used to minimize noise, though our main output is the console.log.
npx eslint 'src/transforms/**/*.ts' \
  --rule 'data-lineage/extract-lineage: error' \
  --rulesdir './eslint-rules' \
  --no-eslintrc \
  --parser '@typescript-eslint/parser' \
  --no-color \
  --format=compact | grep "LINEAGE_METADATA:" | sed 's/LINEAGE_METADATA://' > raw_lineage.log

# Post-process the output into a valid JSON array
echo "[" > lineage_output.json
paste -sd, raw_lineage.log >> lineage_output.json
echo "]" >> lineage_output.json

echo "Lineage extraction complete. Output at lineage_output.json"

# Error handling: check if the final JSON is valid
if ! jq '.' lineage_output.json > /dev/null 2>&1; then
    echo "Error: Failed to generate valid JSON output."
    exit 1
fi

Running this script produced a lineage_output.json file, the first piece of our puzzle. A pitfall here is that this parser is inherently fragile. It relies on developers using string literals for table paths. A developer who constructs the path dynamically (const path = 's3://' + bucket + '/table'; spark.read.hudi(path)) would break our extraction. For our V1, this was an acceptable trade-off; we enforced the convention via code review.

Phase 2: Modeling and Ingesting into Neo4j

With the code-level dependencies mapped, we designed our graph schema. Simplicity was key.

graph TD
    subgraph Neo4j Graph Model
        CF1((:CodeFile))
        CF2((:CodeFile))
        T1((:HudiTable))
        T2((:HudiTable))
        T3((:HudiTable))
    end

    CF1 -- WRITES_TO --> T2
    CF1 -- READS_FROM --> T1
    CF2 -- WRITES_TO --> T3
    CF2 -- READS_FROM --> T2

We chose three primary node labels:

  • CodeFile: Represents a source file in our repository. Properties include path and gitCommitHash.
  • HudiTable: Represents a physical Hudi table. Properties include path and name. We extract the name from the path.
  • GitCommit: Represents a specific commit in Git. Properties include hash and timestamp.

And three relationship types:

  • READS_FROM: A CodeFile reads data from a HudiTable.
  • WRITES_TO: A CodeFile writes data to a HudiTable.
  • MODIFIED_IN: A CodeFile was changed in a GitCommit.

We wrote a Python script using the neo4j driver to parse lineage_output.json and populate the graph. The key was using Cypher’s MERGE statement, which acts as an upsert. This makes the ingestion script idempotent—we can run it repeatedly on the same data without creating duplicate nodes or relationships.

Python Ingestion Script: ingest_to_neo4j.py

import json
import os
import re
from neo4j import GraphDatabase
from neo4j.exceptions import ServiceUnavailable

class LineageIngestor:
    def __init__(self, uri, user, password):
        try:
            self.driver = GraphDatabase.driver(uri, auth=(user, password))
            self.driver.verify_connectivity()
            print("Successfully connected to Neo4j.")
        except ServiceUnavailable as e:
            print(f"Error: Could not connect to Neo4j at {uri}. Please check the connection details and database status.", e)
            raise

    def close(self):
        if self.driver:
            self.driver.close()

    def _extract_table_name(self, path):
        """A simple utility to derive a table name from its S3 path."""
        match = re.search(r'/([^/]+)/?$', path)
        return match.group(1) if match else "unknown_table"

    def run_ingestion(self, json_path, git_commit_hash):
        """Ingests lineage data from the ESLint output JSON."""
        if not git_commit_hash:
            raise ValueError("git_commit_hash must be provided.")

        with self.driver.session() as session:
            # Create constraints for uniqueness, crucial for performance and data integrity
            session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:CodeFile) REQUIRE c.path IS UNIQUE")
            session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (t:HudiTable) REQUIRE t.path IS UNIQUE")
            session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (g:GitCommit) REQUIRE g.hash IS UNIQUE")

            with open(json_path, 'r') as f:
                data = json.load(f)

            # Ingest the Git commit node first
            session.run("MERGE (g:GitCommit {hash: $hash})", hash=git_commit_hash)

            for entry in data:
                file_path = entry['file']
                
                # Transactionally process each file's lineage
                tx_result = session.execute_write(
                    self._create_lineage_transaction, 
                    file_path, 
                    git_commit_hash, 
                    entry['sources'], 
                    entry['sinks']
                )
                print(f"Processed {file_path}: {tx_result}")


    @staticmethod
    def _create_lineage_transaction(tx, file_path, git_commit_hash, sources, sinks):
        """
        A single ACID transaction to create nodes and relationships for one file.
        This function is passed to `session.execute_write` for robust transaction management.
        """
        # 1. Merge the CodeFile and link it to the GitCommit
        query = """
        MERGE (cf:CodeFile {path: $file_path})
        WITH cf
        MATCH (g:GitCommit {hash: $git_commit_hash})
        MERGE (cf)-[:MODIFIED_IN]->(g)
        RETURN cf
        """
        tx.run(query, file_path=file_path, git_commit_hash=git_commit_hash)

        # 2. Process all source tables
        for source_path in sources:
            table_name = LineageIngestor._extract_table_name(None, source_path)
            source_query = """
            MATCH (cf:CodeFile {path: $file_path})
            MERGE (ht:HudiTable {path: $source_path})
            ON CREATE SET ht.name = $table_name
            MERGE (cf)-[:READS_FROM]->(ht)
            """
            tx.run(source_query, file_path=file_path, source_path=source_path, table_name=table_name)

        # 3. Process all sink tables
        for sink_path in sinks:
            table_name = LineageIngestor._extract_table_name(None, sink_path)
            sink_query = """
            MATCH (cf:CodeFile {path: $file_path})
            MERGE (ht:HudiTable {path: $sink_path})
            ON CREATE SET ht.name = $table_name
            MERGE (cf)-[:WRITES_TO]->(ht)
            """
            tx.run(sink_query, file_path=file_path, sink_path=sink_path, table_name=table_name)
        
        return {"sources": len(sources), "sinks": len(sinks)}


if __name__ == '__main__':
    # Configuration from environment variables for production-readiness
    NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
    NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
    NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
    GIT_COMMIT = os.getenv("GIT_COMMIT_HASH")
    LINEAGE_FILE = "lineage_output.json"

    if not GIT_COMMIT:
        print("Error: GIT_COMMIT_HASH environment variable not set.")
        exit(1)

    ingestor = LineageIngestor(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
    try:
        ingestor.run_ingestion(LINEAGE_FILE, GIT_COMMIT)
    finally:
        ingestor.close()

This script, run as part of our CI pipeline after the extraction step, populated Neo4j with a static view of the code’s data dependencies for a given Git commit.

Phase 3: Linking Code to Live Data with Hudi Metadata

The graph was still missing a crucial link: the connection between a code version and an actual data commit in the lake. We modified our Spark jobs to inject the GIT_COMMIT_HASH into the Hudi write options.

Example Spark/Hudi Write Operation (Scala):

val gitCommitHash = sys.env.getOrElse("GIT_COMMIT_HASH", "unknown")

dataframe.write
  .format("hudi")
  .option(HoodieWriteConfig.TBL_NAME.key(), "my_hudi_table")
  .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "uuid")
  .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "updated_at")
  // The critical link: inject the Git hash into the commit metadata
  .option("hoodie.commit.metadata.key.prefix", "custom_")
  .option("custom_git_commit_hash", gitCommitHash)
  .mode(SaveMode.Append)
  .save("/path/to/my_hudi_table")

Now, every commit in our Hudi tables was stamped with the hash of the code that produced it. The final step was to periodically scan the Hudi commit timeline and update our Neo4j graph. We wrote another utility script for this.

Hudi Metadata Scanner: scan_hudi_timeline.py

import os
from neo4j import GraphDatabase
from pyspark.sql import SparkSession

# This script would typically run on a schedule (e.g., hourly cron)

class HudiTimelineScanner:
    # ... (Neo4j connection setup as before) ...

    def scan_and_update(self, table_path):
        spark = SparkSession.builder.appName("HudiTimelineScanner").getOrCreate()
        
        # Hudi's metadata is not directly queryable as a dataframe easily across all versions,
        # so we interact with the commit files on the file system. A common approach.
        timeline_path = os.path.join(table_path, ".hoodie")
        fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
        
        try:
            # List all commit files in the timeline
            commit_files = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(timeline_path))
            
            commits_to_process = []
            for file_status in commit_files:
                filename = file_status.getPath().getName()
                if ".commit" in filename or ".deltacommit" in filename:
                    # In a real system, you'd track the last processed timestamp
                    # to avoid rescanning the entire timeline every time.
                    instant_time = filename.split('.')[0]
                    
                    # Read commit metadata
                    commit_meta = spark._jvm.org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(
                        fs.open(file_status.getPath()).readAllBytes(),
                        spark._jvm.org.apache.hudi.common.model.HoodieCommitMetadata
                    )
                    
                    extra_meta = commit_meta.getExtraMetadata()
                    if "custom_git_commit_hash" in extra_meta:
                        git_hash = extra_meta.get("custom_git_commit_hash")
                        commits_to_process.append({
                            "hudi_commit_time": instant_time,
                            "git_commit_hash": git_hash
                        })
            
            if commits_to_process:
                self._update_neo4j(table_path, commits_to_process)
                print(f"Updated Neo4j with {len(commits_to_process)} commits for table {table_path}")

        except Exception as e:
            # Hadoop's FileSystem API can throw Java exceptions
            print(f"Error scanning Hudi timeline for {table_path}: {e}")
            
    def _update_neo4j(self, table_path, commits):
        with self.driver.session() as session:
            session.execute_write(self._create_commit_links_transaction, table_path, commits)

    @staticmethod
    def _create_commit_links_transaction(tx, table_path, commits):
        # This transaction links a HudiTable to a GitCommit, creating the final bridge
        query = """
        UNWIND $commits as commit_data
        MATCH (ht:HudiTable {path: $table_path})
        MATCH (gc:GitCommit {hash: commit_data.git_commit_hash})
        // Using a new node for Hudi commits for more detailed modeling
        MERGE (hc:HudiCommit {id: ht.path + '_' + commit_data.hudi_commit_time})
        ON CREATE SET hc.timestamp = commit_data.hudi_commit_time
        MERGE (ht)-[:HAS_COMMIT]->(hc)
        MERGE (hc)-[:PRODUCED_BY]->(gc)
        """
        tx.run(query, table_path=table_path, commits=commits)

# Example usage
# scanner = HudiTimelineScanner(...)
# scanner.scan_and_update("s3://my-bucket/data/tables/fact_sales")

The Result: Actionable Lineage Queries

With the graph fully populated and continuously updated, we could finally answer our critical questions.

Query 1: Upstream Impact Analysis (What code produces fact_transactions?)

// Find all code files that directly or indirectly write to a specific table.
// The `*1..10` specifies a variable-length path search, up to 10 hops deep.
MATCH (target:HudiTable {name: "fact_transactions"})
MATCH path = (source_file:CodeFile)-[:WRITES_TO|READS_FROM*1..10]->(target)
RETURN DISTINCT source_file.path AS code_path

Query 2: Downstream Impact Analysis (What tables are affected by changing currency_converter.ts?)

// Find all Hudi tables downstream from a specific code file.
MATCH (source_file:CodeFile)
WHERE source_file.path ENDS WITH 'transforms/utils/currency_converter.ts'
MATCH path = (source_file)-[:WRITES_TO|READS_FROM*1..10]->(downstream_table:HudiTable)
RETURN DISTINCT downstream_table.name AS affected_table

These queries, integrated into a simple CLI tool for developers, provided instant visibility. Before merging a pull request, a developer could run the downstream analysis to see the full blast radius of their change, turning a 40-hour forensic investigation into a 4-second query.

This system is not without its limitations. The ESLint-based parsing is convention-dependent and cannot understand complex, dynamic code logic or SQL embedded within strings. A more advanced implementation would require a more sophisticated language parser, potentially leveraging compiler APIs or tools like OpenLineage for a standardized metadata format. Furthermore, the batch-based ingestion into Neo4j can have latency. For a more real-time system, a streaming architecture using Kafka to pipe metadata events from the CI pipeline and Hudi commit hooks directly into the graph would be a necessary evolution. The current implementation, however, solved our most immediate and costly problem, shifting our data engineering culture from reactive firefighting to proactive impact assessment.


  TOC