Constructing an Auditable MLOps Lineage System with a Neo4j Graph and an Asynchronous RabbitMQ Pipeline


The operational visibility into our machine learning pipelines had degraded into a critical liability. When a production model’s performance drifted, the subsequent investigation was a painful, manual process of archaeology through Git logs, disparate CI/CD job histories, and S3 bucket timestamps. Answering a simple question like “What exact dataset version and feature engineering code produced this deployed model?” could consume an entire day. This wasn’t just inefficient; it was a compliance and reliability risk we could no longer tolerate. The initial mandate was clear: build a centralized, automated system to track the lineage of every ML asset from data ingestion to production deployment.

Our first proof-of-concept was a simple REST API on a central service. Services in the pipeline would make synchronous calls to this “lineage logger” at critical checkpoints. This approach failed spectacularly within a week. The tight coupling introduced unacceptable latency into our training and deployment jobs. A failure in the lineage service would halt the entire pipeline. It was a classic architectural mistake, and it forced us back to the drawing board with a new set of non-negotiable requirements: asynchronicity, durability, and a data model built for complex, multi-hop relationship queries.

This led to the selection of RabbitMQ as the event bus and Neo4j as the graph store. RabbitMQ would decouple our ML pipeline services from the lineage tracker, allowing them to fire-and-forget events without waiting for a response. Its durable queues would ensure that even if the lineage ingestion system went down, events would be preserved and processed later. Neo4j was chosen because lineage is, fundamentally, a graph. Datasets, code commits, training jobs, model artifacts, and deployments are nodes; the relationships between them (USED, PRODUCED, VERSION_OF) are edges. Attempting to model this in a relational database would have inevitably led to a painful series of recursive CTEs or slow, multi-level JOIN operations. A graph database was the only sane choice for the query patterns we anticipated.

The core of this new system would be a standalone Node.js microservice: the “Lineage Ingestor”. Its sole responsibility is to consume events from a RabbitMQ queue and translate them into Cypher queries that build our lineage graph in Neo4j. Given its critical role as the single writer to our audit log, this service had to be built to a production-grade standard. This is where a strict development toolchain became essential. We mandated the use of Rome for uncompromising code formatting and linting and Jest for a comprehensive testing suite covering everything from simple utility functions to full integration tests against live RabbitMQ and Neo4j instances.

Defining the Event-Driven Architecture

The entire system hinges on a well-defined set of events. Each event represents a state change or an artifact creation within the MLOps lifecycle. They are immutable facts. We settled on a simple JSON structure with a header for metadata and a payload for the specific event details.

Here are a few key event schemas:

dataset.created event:

{
  "eventId": "evt_2a8b9c0d-e1f2-4a5b-8c6d-7e8f9a0b1c2d",
  "eventType": "dataset.created",
  "timestamp": "2023-10-27T10:30:00Z",
  "source": "data-ingestion-service",
  "version": "1.0",
  "payload": {
    "datasetId": "dts_weather_hourly_raw_v2",
    "name": "weather_hourly_raw",
    "version": 2,
    "storageLocation": "s3://ml-datasets/raw/weather_hourly_v2.parquet",
    "checksum": "sha256:f2ca1bb6c7e907d06dafe4687e579fce76b37e4e93b7605022da52e6ccc26fd2"
  }
}

training.job.started event:

{
  "eventId": "evt_3b9c0d1e-f2a3-5b6c-9d7e-8f9a0b1c2d3e",
  "eventType": "training.job.started",
  "timestamp": "2023-10-27T11:00:00Z",
  "source": "training-orchestrator",
  "version": "1.0",
  "payload": {
    "jobId": "job_train_forecaster_1698387600",
    "inputDatasetIds": ["dts_weather_hourly_raw_v2"],
    "gitCommitHash": "a1b2c3d4e5f67890a1b2c3d4e5f67890a1b2c3d4",
    "parameters": {
      "learning_rate": 0.001,
      "epochs": 50,
      "model_type": "LSTM"
    }
  }
}

model.packaged event:

{
  "eventId": "evt_4c0d1e2f-3a4b-6c7d-0e8f-9a0b1c2d3e4f",
  "eventType": "model.packaged",
  "timestamp": "2023-10-27T13:45:00Z",
  "source": "training-job-runner",
  "version": "1.0",
  "payload": {
    "modelId": "mdl_weather_forecaster_v4",
    "modelName": "weather_forecaster",
    "modelVersion": 4,
    "producingJobId": "job_train_forecaster_1698387600",
    "storageLocation": "s3://ml-models/weather_forecaster/v4/model.tar.gz",
    "metrics": {
      "mae": 0.5,
      "rmse": 0.75
    }
  }
}

These events are published by the various microservices in our ML pipeline to a single RabbitMQ topic exchange named ml_lineage_exchange. The routing key follows a simple convention: lineage.<event_type>, for example, lineage.dataset.created.

Our Lineage Ingestor service subscribes to a durable queue, lineage_ingestor_queue, which is bound to the exchange with the routing key lineage.#. This ensures it receives all lineage-related events.

graph TD
    subgraph ML Pipeline Services
        A[Data Ingestion] --> R
        B[Training Orchestrator] --> R
        C[Model Registry] --> R
    end

    subgraph Message Bus
        R(RabbitMQ Exchange: ml_lineage_exchange) -- routing_key: lineage.# --> Q[Durable Queue: lineage_ingestor_queue]
    end

    subgraph Lineage Tracking System
        Q --> LI[Lineage Ingestor Service]
        LI -- Cypher Queries --> N[(Neo4j Database)]
    end

The Lineage Ingestor Service: Implementation

The service is built with Node.js. Its core logic is deceptively simple: connect, consume, process, acknowledge. The complexity lies in doing this reliably and idempotently.

Project Setup and Configuration

The package.json reflects our technology choices:

{
  "name": "lineage-ingestor",
  "version": "1.0.0",
  "main": "src/index.js",
  "scripts": {
    "start": "node src/index.js",
    "test": "jest",
    "check": "rome check .",
    "format": "rome format . --write"
  },
  "dependencies": {
    "amqplib": "^0.10.3",
    "dotenv": "^16.3.1",
    "neo4j-driver": "^5.14.0",
    "winston": "^3.11.0"
  },
  "devDependencies": {
    "jest": "^29.7.0",
    "rome": "^12.1.3"
  }
}

Configuration is handled via environment variables, loaded by dotenv for local development. This is critical for production environments where configuration is injected by the orchestration layer (e.g., Kubernetes ConfigMaps).

.env.example

# RabbitMQ Connection
RABBITMQ_URI=amqp://guest:guest@localhost:5672
RABBITMQ_EXCHANGE=ml_lineage_exchange
RABBITMQ_QUEUE=lineage_ingestor_queue

# Neo4j Connection
NEO4J_URI=neo4j://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=password

# Application Settings
LOG_LEVEL=info

A dedicated config module ensures that the application fails fast if critical configuration is missing.

src/config.js

import dotenv from 'dotenv';
dotenv.config();

const required_vars = [
    'RABBITMQ_URI', 'RABBITMQ_EXCHANGE', 'RABBITMQ_QUEUE',
    'NEO4J_URI', 'NEO4J_USER', 'NEO4J_PASSWORD'
];

for (const v of required_vars) {
    if (!process.env[v]) {
        throw new Error(`FATAL: Missing required environment variable: ${v}`);
    }
}

export const config = {
    rabbitmq: {
        uri: process.env.RABBITMQ_URI,
        exchange: process.env.RABBITMQ_EXCHANGE,
        queue: process.env.RABBITMQ_QUEUE,
    },
    neo4j: {
        uri: process.env.NEO4J_URI,
        user: process.env.NEO4J_USER,
        password: process.env.NEO4J_PASSWORD,
    },
    logLevel: process.env.LOG_LEVEL || 'info',
};

Neo4j Data Modeling and Idempotency

Before writing any consumer code, we had to define our graph model and ensure our write operations were idempotent. A common pitfall in event-driven systems is processing the same message twice (e.g., due to a consumer crash after processing but before acknowledging). This must not result in duplicate nodes or relationships in our graph.

We established unique constraints on the business identifiers for each node type.

Cypher DDL for Constraints:

CREATE CONSTRAINT dataset_id_unique IF NOT EXISTS FOR (d:Dataset) REQUIRE d.datasetId IS UNIQUE;
CREATE CONSTRAINT job_id_unique IF NOT EXISTS FOR (j:TrainingJob) REQUIRE j.jobId IS UNIQUE;
CREATE CONSTRAINT model_id_unique IF NOT EXISTS FOR (m:Model) REQUIRE m.modelId IS UNIQUE;
CREATE CONSTRAINT git_commit_hash_unique IF NOT EXISTS FOR (c:GitCommit) REQUIRE c.hash IS UNIQUE;

All write operations use the MERGE clause. MERGE acts like an “upsert”: it finds a pattern or creates it if it doesn’t exist. This is the key to idempotency.

Here is the Cypher query template for handling a training.job.started event. It’s complex, but each part has a specific purpose.

// 1. Merge the TrainingJob node itself
MERGE (job:TrainingJob {jobId: $jobId})
ON CREATE SET
  job.createdAt = $timestamp,
  job.parameters = $parameters
ON MATCH SET
  // We generally don't update on match for lineage, as events are facts.
  // This could be used for enrichment if needed.
  job.updatedAt = $timestamp

// 2. Merge the GitCommit node
MERGE (commit:GitCommit {hash: $gitCommitHash})

// 3. Merge the relationship between the job and the commit
MERGE (job)-[:SOURCED_FROM]->(commit)

// 4. For each input dataset, merge the node and the relationship
// This part is executed in a loop over inputDatasetIds
WITH job
UNWIND $inputDatasetIds AS datasetId
MERGE (dataset:Dataset {datasetId: datasetId})
MERGE (job)-[:USED]->(dataset)

Core Consumer and Event Handling Logic

The main application logic orchestrates the connections and the message consumption loop.

src/index.js

import { createLogger } from './logging.js';
import { createNeo4jDriver } from './neo4j.js';
import { createRabbitMQChannel } from './rabbitmq.js';
import { createEventHandler } from './eventHandler.js';
import { config } from './config.js';

const logger = createLogger(config.logLevel);

async function main() {
    let neo4jDriver;
    let rabbitmqChannel;

    try {
        logger.info('Starting Lineage Ingestor service...');

        // Establish connections
        neo4jDriver = createNeo4jDriver(config.neo4j.uri, config.neo4j.user, config.neo4j.password);
        await neo4jDriver.verifyConnectivity();
        logger.info('Successfully connected to Neo4j.');

        const { channel, connection } = await createRabbitMQChannel(config.rabbitmq.uri);
        rabbitmqChannel = channel;
        logger.info('Successfully connected to RabbitMQ.');

        // Assert exchange and queue to ensure they exist
        await rabbitmqChannel.assertExchange(config.rabbitmq.exchange, 'topic', { durable: true });
        await rabbitmqChannel.assertQueue(config.rabbitmq.queue, { durable: true });
        await rabbitmqChannel.bindQueue(config.rabbitmq.queue, config.rabbitmq.exchange, 'lineage.#');
        
        const eventHandler = createEventHandler(neo4jDriver, logger);

        logger.info(`Waiting for messages in queue: ${config.rabbitmq.queue}. To exit press CTRL+C`);
        
        rabbitmqChannel.consume(config.rabbitmq.queue, async (msg) => {
            if (msg !== null) {
                try {
                    const messageBody = msg.content.toString();
                    const event = JSON.parse(messageBody);
                    logger.debug(`Received event [${event.eventId}] of type [${event.eventType}]`);
                    
                    await eventHandler.handle(event);

                    // Acknowledge the message if processing was successful
                    rabbitmqChannel.ack(msg);
                    logger.debug(`Successfully processed and ACKed event [${event.eventId}]`);

                } catch (error) {
                    logger.error({
                        message: `Failed to process event. Message will be NACKed.`,
                        error: error.message,
                        stack: error.stack,
                        content: msg.content.toString(),
                    });
                    // Negative acknowledgement. The 'false' argument tells RabbitMQ not to requeue it.
                    // In a real-world project, this would go to a dead-letter queue for manual inspection.
                    rabbitmqChannel.nack(msg, false, false);
                }
            }
        });

        // Graceful shutdown handling
        process.on('SIGINT', async () => {
            logger.info('SIGINT received. Shutting down gracefully.');
            if (rabbitmqChannel) await rabbitmqChannel.close();
            if (connection) await connection.close();
            if (neo4jDriver) await neo4jDriver.close();
            process.exit(0);
        });

    } catch (error) {
        logger.error(`Fatal application error: ${error.message}`, { stack: error.stack });
        if (neo4jDriver) await neo4jDriver.close();
        // The RabbitMQ connection handling in amqplib closes the channel with the connection
        process.exit(1);
    }
}

main();

The eventHandler module is a router that calls the appropriate function based on eventType. This keeps the main consumption loop clean.

src/eventHandler.js

// A map of event types to their handler functions
const handlers = {
    'dataset.created': handleDatasetCreated,
    'training.job.started': handleTrainingJobStarted,
    'model.packaged': handleModelPackaged,
};

export function createEventHandler(driver, logger) {
    async function handle(event) {
        const handler = handlers[event.eventType];
        if (!handler) {
            logger.warn(`No handler found for event type: ${event.eventType}. Skipping.`);
            return;
        }
        await handler(driver, event.payload, event.timestamp);
    }
    return { handle };
}

// Example handler for 'dataset.created'
async function handleDatasetCreated(driver, payload, timestamp) {
    const session = driver.session();
    try {
        const query = `
            MERGE (d:Dataset {datasetId: $datasetId})
            ON CREATE SET
                d.name = $name,
                d.version = $version,
                d.storageLocation = $storageLocation,
                d.checksum = $checksum,
                d.createdAt = $timestamp
        `;
        await session.run(query, { ...payload, timestamp });
    } finally {
        await session.close();
    }
}

// Handler for 'model.packaged'
async function handleModelPackaged(driver, payload, timestamp) {
    const session = driver.session();
    try {
        const query = `
            // Find the producing job
            MATCH (job:TrainingJob {jobId: $producingJobId})
            
            // Create the model
            MERGE (model:Model {modelId: $modelId})
            ON CREATE SET
                model.name = $modelName,
                model.version = $modelVersion,
                model.storageLocation = $storageLocation,
                model.metrics = $metrics,
                model.createdAt = $timestamp
            
            // Create the relationship
            MERGE (job)-[:PRODUCED]->(model)
        `;
        await session.run(query, { ...payload, timestamp });
    } finally {
        await session.close();
    }
}

// ... other handlers like handleTrainingJobStarted
async function handleTrainingJobStarted(driver, payload, timestamp) {
    const session = driver.session();
    try {
        const { jobId, gitCommitHash, parameters, inputDatasetIds } = payload;
        
        const tx = session.beginTransaction();

        // Use a transaction for multi-step graph modifications
        await tx.run(`
            MERGE (job:TrainingJob {jobId: $jobId})
            ON CREATE SET
              job.createdAt = $timestamp,
              job.parameters = $parameters
        `, { jobId, timestamp, parameters });
        
        await tx.run(`
            MERGE (commit:GitCommit {hash: $gitCommitHash})
            WITH commit
            MATCH (job:TrainingJob {jobId: $jobId})
            MERGE (job)-[:SOURCED_FROM]->(commit)
        `, { gitCommitHash, jobId });

        for (const datasetId of inputDatasetIds) {
            await tx.run(`
                MERGE (dataset:Dataset {datasetId: $datasetId})
                WITH dataset
                MATCH (job:TrainingJob {jobId: $jobId})
                MERGE (job)-[:USED]->(dataset)
            `, { datasetId, jobId });
        }
        
        await tx.commit();
    } catch (error) {
        // If any part fails, the transaction is rolled back automatically.
        throw error;
    } finally {
        await session.close();
    }
}

Production-Grade Quality Gates: Rome and Jest

A service this critical cannot rely on manual code review alone. We enforced strict quality gates in our CI pipeline.

Rome Configuration (rome.json):
We opted for Rome’s recommended settings, which are strict by default. This eliminated entire categories of bugs and endless debates about code style.

{
  "linter": {
    "enabled": true,
    "rules": {
      "recommended": true
    }
  },
  "formatter": {
    "enabled": true,
    "indentStyle": "space",
    "indentSize": 4
  }
}

Running npm run check and npm run format became mandatory pre-commit hooks.

Jest Testing Strategy:
Testing was divided into two parts: unit tests with mocks and full integration tests.

Unit Testing (eventHandler.test.js):
For unit tests, we mock the Neo4j driver to ensure the correct Cypher query and parameters are generated for a given event, without needing a database connection.

import { createEventHandler } from './eventHandler';

describe('Event Handler', () => {
    let mockDriver;
    let mockSession;
    let mockRun;
    let logger;

    beforeEach(() => {
        // Deeply mock the neo4j-driver
        mockRun = jest.fn().mockResolvedValue({});
        mockSession = {
            run: mockRun,
            close: jest.fn().mockResolvedValue({}),
        };
        mockDriver = {
            session: jest.fn().mockReturnValue(mockSession),
        };
        logger = { warn: jest.fn(), debug: jest.fn() };
    });

    it('should call handleDatasetCreated with correct query and params', async () => {
        const handler = createEventHandler(mockDriver, logger);
        const event = {
            eventType: 'dataset.created',
            timestamp: '2023-10-27T10:30:00Z',
            payload: {
                datasetId: 'test-dataset',
                name: 'test',
                version: 1,
            },
        };

        await handler.handle(event);

        expect(mockDriver.session).toHaveBeenCalled();
        expect(mockRun).toHaveBeenCalledTimes(1);
        
        // Check that the Cypher query is what we expect. Using a regex for flexibility.
        const calledQuery = mockRun.mock.calls[0][0];
        expect(calledQuery).toMatch(/MERGE \(d:Dataset \{datasetId: \$datasetId\}\)/);

        // Check that the parameters are passed correctly.
        const calledParams = mockRun.mock.calls[0][1];
        expect(calledParams).toEqual({
            ...event.payload,
            timestamp: event.timestamp,
        });

        expect(mockSession.close).toHaveBeenCalled();
    });

    it('should warn and skip for an unknown event type', async () => {
        const handler = createEventHandler(mockDriver, logger);
        const event = { eventType: 'unknown.event', payload: {} };
        
        await handler.handle(event);

        expect(logger.warn).toHaveBeenCalledWith('No handler found for event type: unknown.event. Skipping.');
        expect(mockDriver.session).not.toHaveBeenCalled();
    });
});

Integration Testing (lineage.integration.test.js):
This is where the real confidence comes from. These tests require live RabbitMQ and Neo4j instances, typically spun up via Docker Compose for CI environments. The test publishes a real event to RabbitMQ, waits for the consumer (running in the test process) to process it, and then directly queries Neo4j to verify the graph was updated correctly.

A real-world project would use a library like testcontainers to manage this, but for clarity, the logic is shown assuming the services are running.

import amqplib from 'amqplib';
import neo4j from 'neo4j-driver';
import { config } from '../src/config'; // Use the same config as the app
import { createEventHandler } from '../src/eventHandler';

describe('Lineage Ingestor Integration Test', () => {
    let rabbitChannel, rabbitConnection;
    let neo4jDriver;

    // Connect to services before all tests
    beforeAll(async () => {
        rabbitConnection = await amqplib.connect(config.rabbitmq.uri);
        rabbitChannel = await rabbitConnection.createChannel();
        neo4jDriver = neo4j.driver(config.neo4j.uri, neo4j.auth.basic(config.neo4j.user, config.neo4j.password));
        
        // Clean the database before the test suite
        const session = neo4jDriver.session();
        await session.run('MATCH (n) DETACH DELETE n');
        await session.close();
    });

    // Disconnect after all tests
    afterAll(async () => {
        await rabbitChannel.close();
        await rabbitConnection.close();
        await neo4jDriver.close();
    });

    it('should create dataset, job, and model nodes and relationships from a sequence of events', async () => {
        // A simplified consumer for the test
        const consumePromise = new Promise((resolve) => {
            const eventHandler = createEventHandler(neo4jDriver, { warn: () => {}, debug: () => {} });
            let messagesToProcess = 3;

            rabbitChannel.consume(config.rabbitmq.queue, async (msg) => {
                if (msg) {
                    await eventHandler.handle(JSON.parse(msg.content.toString()));
                    rabbitChannel.ack(msg);
                    messagesToProcess--;
                    if (messagesToProcess === 0) resolve();
                }
            });
        });

        // 1. Publish a dataset event
        const datasetEvent = { eventType: 'dataset.created', payload: { datasetId: 'integ-test-ds-1' } };
        rabbitChannel.publish(config.rabbitmq.exchange, 'lineage.dataset.created', Buffer.from(JSON.stringify(datasetEvent)));
        
        // 2. Publish a training job event
        const jobEvent = { eventType: 'training.job.started', payload: { jobId: 'integ-test-job-1', inputDatasetIds: ['integ-test-ds-1'], gitCommitHash: 'integ-test-commit' } };
        rabbitChannel.publish(config.rabbitmq.exchange, 'lineage.training.job.started', Buffer.from(JSON.stringify(jobEvent)));
        
        // 3. Publish a model packaged event
        const modelEvent = { eventType: 'model.packaged', payload: { modelId: 'integ-test-model-1', producingJobId: 'integ-test-job-1' } };
        rabbitChannel.publish(config.rabbitmq.exchange, 'lineage.model.packaged', Buffer.from(JSON.stringify(modelEvent)));

        // Wait for the consumer to process all 3 messages
        await consumePromise;

        // 4. Verify the state in Neo4j
        const session = neo4jDriver.session();
        const result = await session.run(`
            MATCH (d:Dataset {datasetId: 'integ-test-ds-1'})<-[:USED]-(j:TrainingJob {jobId: 'integ-test-job-1'})-[:PRODUCED]->(m:Model {modelId: 'integ-test-model-1'})
            RETURN count(*) AS count
        `);
        await session.close();

        expect(result.records[0].get('count').low).toBe(1);
    }, 30000); // Increase timeout for integration tests
});

Querying the Final Lineage Graph

With the system running, we could finally answer our critical questions with a simple Cypher query. To find the entire history of a deployed model:

MATCH (m:Model {modelId: 'mdl_weather_forecaster_v4'})<-[:PRODUCED]-(j:TrainingJob)<-[:SOURCED_FROM]-(c:GitCommit)
MATCH (j)-[:USED]->(d:Dataset)
RETURN m.name, m.version, j.jobId, c.hash, collect(d.datasetId) AS datasets_used

This query, which would be a nightmare in SQL, is expressive and fast in Cypher. It traverses the graph backwards from a specific model node to find the job that produced it, the code commit for that job, and all datasets the job consumed. This is the auditability we were seeking.

The current implementation provides a solid foundation, but it is not without its limitations. The event schemas are managed by convention; a schema registry like Avro would be a robust improvement to enforce contracts between producers and the consumer, preventing malformed data from entering the pipeline. The ingestor is also a single point of consumption. While RabbitMQ’s durability protects against data loss during downtime, for higher throughput scenarios, we would need to scale out the consumer to multiple instances. This would require even more stringent guarantees around idempotency to handle potential message redeliveries across different consumer instances concurrently. Finally, as the graph grows, query performance and Neo4j cluster management will become their own specialized domains requiring dedicated monitoring and tuning.


  TOC