The incident that finally broke our manual processes began with a one-line change to a seemingly innocuous data transformation script. A business analyst updated a currency conversion function. Three days later, our core financial reporting dashboard was producing nonsensical figures. The subsequent investigation consumed nearly 40 engineering hours, involving a frantic trace through hundreds of interdependent scripts, undocumented dependencies, and ambiguous commit messages. The root cause was a subtle floating-point precision change that cascaded through five downstream Apache Hudi tables. We had no automated way to determine the blast radius of a code change. This was our technical breaking point.
Our initial concept was to bridge the gap between the static world of our data transformation codebase and the dynamic world of our data lake. We needed a system that could answer two critical questions instantly:
- Given a specific line of code, what datasets will it ultimately affect?
- Given a specific dataset, what code is responsible for its current state?
The answer, we hypothesized, lay in treating our entire data ecosystem as a single, interconnected graph.
Technology Selection Rationale
In a real-world project, technology choices are driven by constraints and leverage. We weren’t starting from a blank slate.
Apache Hudi: Our data lake was already built on Hudi. Its key feature for this project was not just its transactional capabilities but its detailed commit timeline metadata stored in the
.hoodie
directory. Eachcommit
ordeltacommit
contains information about the files changed and a slot for custom metadata. This was our hook into the data layer. We could stamp our data pipeline commits with the Git hash of the code that ran them.ESLint: This choice was unconventional. Most teams see ESLint as a linter for code style and simple errors. We saw its core engine: an Abstract Syntax Tree (AST) parser. Our transformation logic was primarily written in TypeScript. Instead of writing a complex, brittle parser from scratch, we could build a custom ESLint plugin. This plugin wouldn’t enforce style; it would traverse the AST of each transformation file to extract data lineage metadata, such as source tables and target tables. It was already integrated into our CI, providing a natural entry point for metadata extraction.
Neo4j: Data lineage is fundamentally a graph problem. Nodes represent entities like code files, functions, and data tables. Relationships represent actions like
READS_FROM
,WRITES_to
, andDEPENDS_ON
. Attempting to model this in a relational database would result in a nightmare of recursive queries andJOIN
s. Neo4j was built for this. Its Cypher query language is expressive for traversing variable-length paths, making impact analysis queries trivial.
Phase 1: Building the ESLint Metadata Extractor
The first challenge was to programmatically understand our code. Our convention was to use wrapper functions like spark.read.hudi('s3://bucket/path/to/table')
and dataframe.write.hudi('s3://bucket/path/to/target')
. Our ESLint rule needed to identify these calls and extract their arguments.
Here is the core structure of our custom ESLint plugin. This is not a simple rule; it’s a dedicated metadata extractor.
File: ./eslint-rules/extract-lineage.js
/**
* @fileoverview Extracts Hudi read/write operations for data lineage.
* This is not a linter rule in the traditional sense. Its primary purpose
* is to parse the AST and output structured metadata via a custom reporter.
*/
'use strict';
// A map to store the lineage data found per file.
// In a real implementation, this would write to a file or stream to a service.
const lineageData = new Map();
module.exports = {
meta: {
type: 'problem',
docs: {
description: 'Extracts data lineage from Hudi read/write calls',
category: 'Data Engineering',
recommended: false,
},
fixable: null,
schema: [],
},
create: function (context) {
const filename = context.getFilename();
if (!lineageData.has(filename)) {
lineageData.set(filename, { sources: new Set(), sinks: new Set() });
}
// This is the core logic: the AST visitor.
// We are looking for specific call patterns.
return {
// Visitor for nodes of type 'CallExpression', e.g., myFunction(arg1)
CallExpression(node) {
const callee = node.callee;
// Pattern 1: Look for `...write.hudi(...)`
if (
callee.type === 'MemberExpression' &&
callee.property.type === 'Identifier' &&
callee.property.name === 'hudi' &&
callee.object.type === 'MemberExpression' &&
callee.object.property.type === 'Identifier' &&
callee.object.property.name === 'write'
) {
if (node.arguments.length > 0 && node.arguments[0].type === 'Literal') {
const tablePath = node.arguments[0].value;
lineageData.get(filename).sinks.add(tablePath);
// We could report a "finding" here, but we are using this silently.
// context.report({ node, message: `Found Hudi sink: ${tablePath}` });
}
}
// Pattern 2: Look for `...read.hudi(...)`
if (
callee.type === 'MemberExpression' &&
callee.property.type === 'Identifier' &&
callee.property.name === 'hudi' &&
callee.object.type === 'MemberExpression' &&
callee.object.property.type === 'Identifier' &&
callee.object.property.name === 'read'
) {
if (node.arguments.length > 0 && node.arguments[0].type === 'Literal') {
const tablePath = node.arguments[0].value;
lineageData.get(filename).sources.add(tablePath);
}
}
},
// The 'Program:exit' event fires after the AST traversal for a file is complete.
// This is the perfect place to process the collected data.
'Program:exit': function () {
// In a real CI/CD pipeline, this is where we'd serialize and output the data.
// For this example, we log it. A better approach is a custom reporter.
const fileLineage = lineageData.get(filename);
if (fileLineage && (fileLineage.sources.size > 0 || fileLineage.sinks.size > 0)) {
const output = {
file: filename,
sources: Array.from(fileLineage.sources),
sinks: Array.from(fileLineage.sinks),
};
// This is a hack for demonstration. Production use requires a custom reporter
// to write this to a file properly without polluting stdout.
console.log(`LINEAGE_METADATA:${JSON.stringify(output)}`);
}
}
};
},
// Export the lineage data for potential testing or external use
_getLineageData: () => lineageData,
};
To run this, we configured our .eslintrc.js
to include our local rule and then executed ESLint over our codebase, redirecting the output.
Execution Script: extract_lineage.sh
#!/bin/bash
# Ensure we start with a clean state
rm -f lineage_output.json
# Run ESLint, targeting our data transformation directories.
# We use a special grep pattern to capture only our custom log output.
# The --no-color flag is important for clean parsing.
# The --format=compact is used to minimize noise, though our main output is the console.log.
npx eslint 'src/transforms/**/*.ts' \
--rule 'data-lineage/extract-lineage: error' \
--rulesdir './eslint-rules' \
--no-eslintrc \
--parser '@typescript-eslint/parser' \
--no-color \
--format=compact | grep "LINEAGE_METADATA:" | sed 's/LINEAGE_METADATA://' > raw_lineage.log
# Post-process the output into a valid JSON array
echo "[" > lineage_output.json
paste -sd, raw_lineage.log >> lineage_output.json
echo "]" >> lineage_output.json
echo "Lineage extraction complete. Output at lineage_output.json"
# Error handling: check if the final JSON is valid
if ! jq '.' lineage_output.json > /dev/null 2>&1; then
echo "Error: Failed to generate valid JSON output."
exit 1
fi
Running this script produced a lineage_output.json
file, the first piece of our puzzle. A pitfall here is that this parser is inherently fragile. It relies on developers using string literals for table paths. A developer who constructs the path dynamically (const path = 's3://' + bucket + '/table'; spark.read.hudi(path)
) would break our extraction. For our V1, this was an acceptable trade-off; we enforced the convention via code review.
Phase 2: Modeling and Ingesting into Neo4j
With the code-level dependencies mapped, we designed our graph schema. Simplicity was key.
graph TD subgraph Neo4j Graph Model CF1((:CodeFile)) CF2((:CodeFile)) T1((:HudiTable)) T2((:HudiTable)) T3((:HudiTable)) end CF1 -- WRITES_TO --> T2 CF1 -- READS_FROM --> T1 CF2 -- WRITES_TO --> T3 CF2 -- READS_FROM --> T2
We chose three primary node labels:
-
CodeFile
: Represents a source file in our repository. Properties includepath
andgitCommitHash
. -
HudiTable
: Represents a physical Hudi table. Properties includepath
andname
. We extract the name from the path. -
GitCommit
: Represents a specific commit in Git. Properties includehash
andtimestamp
.
And three relationship types:
-
READS_FROM
: ACodeFile
reads data from aHudiTable
. -
WRITES_TO
: ACodeFile
writes data to aHudiTable
. -
MODIFIED_IN
: ACodeFile
was changed in aGitCommit
.
We wrote a Python script using the neo4j
driver to parse lineage_output.json
and populate the graph. The key was using Cypher’s MERGE
statement, which acts as an upsert
. This makes the ingestion script idempotent—we can run it repeatedly on the same data without creating duplicate nodes or relationships.
Python Ingestion Script: ingest_to_neo4j.py
import json
import os
import re
from neo4j import GraphDatabase
from neo4j.exceptions import ServiceUnavailable
class LineageIngestor:
def __init__(self, uri, user, password):
try:
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.driver.verify_connectivity()
print("Successfully connected to Neo4j.")
except ServiceUnavailable as e:
print(f"Error: Could not connect to Neo4j at {uri}. Please check the connection details and database status.", e)
raise
def close(self):
if self.driver:
self.driver.close()
def _extract_table_name(self, path):
"""A simple utility to derive a table name from its S3 path."""
match = re.search(r'/([^/]+)/?$', path)
return match.group(1) if match else "unknown_table"
def run_ingestion(self, json_path, git_commit_hash):
"""Ingests lineage data from the ESLint output JSON."""
if not git_commit_hash:
raise ValueError("git_commit_hash must be provided.")
with self.driver.session() as session:
# Create constraints for uniqueness, crucial for performance and data integrity
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:CodeFile) REQUIRE c.path IS UNIQUE")
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (t:HudiTable) REQUIRE t.path IS UNIQUE")
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (g:GitCommit) REQUIRE g.hash IS UNIQUE")
with open(json_path, 'r') as f:
data = json.load(f)
# Ingest the Git commit node first
session.run("MERGE (g:GitCommit {hash: $hash})", hash=git_commit_hash)
for entry in data:
file_path = entry['file']
# Transactionally process each file's lineage
tx_result = session.execute_write(
self._create_lineage_transaction,
file_path,
git_commit_hash,
entry['sources'],
entry['sinks']
)
print(f"Processed {file_path}: {tx_result}")
@staticmethod
def _create_lineage_transaction(tx, file_path, git_commit_hash, sources, sinks):
"""
A single ACID transaction to create nodes and relationships for one file.
This function is passed to `session.execute_write` for robust transaction management.
"""
# 1. Merge the CodeFile and link it to the GitCommit
query = """
MERGE (cf:CodeFile {path: $file_path})
WITH cf
MATCH (g:GitCommit {hash: $git_commit_hash})
MERGE (cf)-[:MODIFIED_IN]->(g)
RETURN cf
"""
tx.run(query, file_path=file_path, git_commit_hash=git_commit_hash)
# 2. Process all source tables
for source_path in sources:
table_name = LineageIngestor._extract_table_name(None, source_path)
source_query = """
MATCH (cf:CodeFile {path: $file_path})
MERGE (ht:HudiTable {path: $source_path})
ON CREATE SET ht.name = $table_name
MERGE (cf)-[:READS_FROM]->(ht)
"""
tx.run(source_query, file_path=file_path, source_path=source_path, table_name=table_name)
# 3. Process all sink tables
for sink_path in sinks:
table_name = LineageIngestor._extract_table_name(None, sink_path)
sink_query = """
MATCH (cf:CodeFile {path: $file_path})
MERGE (ht:HudiTable {path: $sink_path})
ON CREATE SET ht.name = $table_name
MERGE (cf)-[:WRITES_TO]->(ht)
"""
tx.run(sink_query, file_path=file_path, sink_path=sink_path, table_name=table_name)
return {"sources": len(sources), "sinks": len(sinks)}
if __name__ == '__main__':
# Configuration from environment variables for production-readiness
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
GIT_COMMIT = os.getenv("GIT_COMMIT_HASH")
LINEAGE_FILE = "lineage_output.json"
if not GIT_COMMIT:
print("Error: GIT_COMMIT_HASH environment variable not set.")
exit(1)
ingestor = LineageIngestor(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
try:
ingestor.run_ingestion(LINEAGE_FILE, GIT_COMMIT)
finally:
ingestor.close()
This script, run as part of our CI pipeline after the extraction step, populated Neo4j with a static view of the code’s data dependencies for a given Git commit.
Phase 3: Linking Code to Live Data with Hudi Metadata
The graph was still missing a crucial link: the connection between a code version and an actual data commit in the lake. We modified our Spark jobs to inject the GIT_COMMIT_HASH
into the Hudi write options.
Example Spark/Hudi Write Operation (Scala):
val gitCommitHash = sys.env.getOrElse("GIT_COMMIT_HASH", "unknown")
dataframe.write
.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key(), "my_hudi_table")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "uuid")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "updated_at")
// The critical link: inject the Git hash into the commit metadata
.option("hoodie.commit.metadata.key.prefix", "custom_")
.option("custom_git_commit_hash", gitCommitHash)
.mode(SaveMode.Append)
.save("/path/to/my_hudi_table")
Now, every commit in our Hudi tables was stamped with the hash of the code that produced it. The final step was to periodically scan the Hudi commit timeline and update our Neo4j graph. We wrote another utility script for this.
Hudi Metadata Scanner: scan_hudi_timeline.py
import os
from neo4j import GraphDatabase
from pyspark.sql import SparkSession
# This script would typically run on a schedule (e.g., hourly cron)
class HudiTimelineScanner:
# ... (Neo4j connection setup as before) ...
def scan_and_update(self, table_path):
spark = SparkSession.builder.appName("HudiTimelineScanner").getOrCreate()
# Hudi's metadata is not directly queryable as a dataframe easily across all versions,
# so we interact with the commit files on the file system. A common approach.
timeline_path = os.path.join(table_path, ".hoodie")
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
try:
# List all commit files in the timeline
commit_files = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(timeline_path))
commits_to_process = []
for file_status in commit_files:
filename = file_status.getPath().getName()
if ".commit" in filename or ".deltacommit" in filename:
# In a real system, you'd track the last processed timestamp
# to avoid rescanning the entire timeline every time.
instant_time = filename.split('.')[0]
# Read commit metadata
commit_meta = spark._jvm.org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(
fs.open(file_status.getPath()).readAllBytes(),
spark._jvm.org.apache.hudi.common.model.HoodieCommitMetadata
)
extra_meta = commit_meta.getExtraMetadata()
if "custom_git_commit_hash" in extra_meta:
git_hash = extra_meta.get("custom_git_commit_hash")
commits_to_process.append({
"hudi_commit_time": instant_time,
"git_commit_hash": git_hash
})
if commits_to_process:
self._update_neo4j(table_path, commits_to_process)
print(f"Updated Neo4j with {len(commits_to_process)} commits for table {table_path}")
except Exception as e:
# Hadoop's FileSystem API can throw Java exceptions
print(f"Error scanning Hudi timeline for {table_path}: {e}")
def _update_neo4j(self, table_path, commits):
with self.driver.session() as session:
session.execute_write(self._create_commit_links_transaction, table_path, commits)
@staticmethod
def _create_commit_links_transaction(tx, table_path, commits):
# This transaction links a HudiTable to a GitCommit, creating the final bridge
query = """
UNWIND $commits as commit_data
MATCH (ht:HudiTable {path: $table_path})
MATCH (gc:GitCommit {hash: commit_data.git_commit_hash})
// Using a new node for Hudi commits for more detailed modeling
MERGE (hc:HudiCommit {id: ht.path + '_' + commit_data.hudi_commit_time})
ON CREATE SET hc.timestamp = commit_data.hudi_commit_time
MERGE (ht)-[:HAS_COMMIT]->(hc)
MERGE (hc)-[:PRODUCED_BY]->(gc)
"""
tx.run(query, table_path=table_path, commits=commits)
# Example usage
# scanner = HudiTimelineScanner(...)
# scanner.scan_and_update("s3://my-bucket/data/tables/fact_sales")
The Result: Actionable Lineage Queries
With the graph fully populated and continuously updated, we could finally answer our critical questions.
Query 1: Upstream Impact Analysis (What code produces fact_transactions
?)
// Find all code files that directly or indirectly write to a specific table.
// The `*1..10` specifies a variable-length path search, up to 10 hops deep.
MATCH (target:HudiTable {name: "fact_transactions"})
MATCH path = (source_file:CodeFile)-[:WRITES_TO|READS_FROM*1..10]->(target)
RETURN DISTINCT source_file.path AS code_path
Query 2: Downstream Impact Analysis (What tables are affected by changing currency_converter.ts
?)
// Find all Hudi tables downstream from a specific code file.
MATCH (source_file:CodeFile)
WHERE source_file.path ENDS WITH 'transforms/utils/currency_converter.ts'
MATCH path = (source_file)-[:WRITES_TO|READS_FROM*1..10]->(downstream_table:HudiTable)
RETURN DISTINCT downstream_table.name AS affected_table
These queries, integrated into a simple CLI tool for developers, provided instant visibility. Before merging a pull request, a developer could run the downstream analysis to see the full blast radius of their change, turning a 40-hour forensic investigation into a 4-second query.
This system is not without its limitations. The ESLint-based parsing is convention-dependent and cannot understand complex, dynamic code logic or SQL embedded within strings. A more advanced implementation would require a more sophisticated language parser, potentially leveraging compiler APIs or tools like OpenLineage for a standardized metadata format. Furthermore, the batch-based ingestion into Neo4j can have latency. For a more real-time system, a streaming architecture using Kafka to pipe metadata events from the CI pipeline and Hudi commit hooks directly into the graph would be a necessary evolution. The current implementation, however, solved our most immediate and costly problem, shifting our data engineering culture from reactive firefighting to proactive impact assessment.