Implementing a Transactional Data Ingestion Layer for CI/CD Performance Metrics Using Jenkins and InfluxDB


The reliability of our CI analytics pipeline was degrading. We were collecting performance metrics from our Vitest test suites, shipping them to InfluxDB, and tracking trends. But under heavy load, with dozens of concurrent Jenkins builds, metrics were being dropped or misattributed. The core issue was a lack of atomicity. A build could successfully report its test durations but fail to register its deployment event, leaving orphaned time-series data. Conversely, a network hiccup could cause the metric push to fail silently after the build was already marked as a success. We were making decisions based on incomplete, and therefore incorrect, data. The initial “fire-and-forget” curl command in our Jenkinsfile was no longer tenable.

Our first thought was to introduce a message queue like Kafka, but that felt like architectural overkill for the problem at hand—it would introduce a whole new piece of infrastructure to maintain. The real requirement wasn’t full-blown event sourcing; it was a simple, transactional guarantee for each build’s data payload: either the deployment marker and all associated Vitest metrics are committed as a single unit, or nothing is. This led us to design a lightweight, ACID-like transactional staging mechanism directly within our Jenkins Shared Library, creating a durable buffer on the build agent before asynchronously shipping the data to InfluxDB.

This approach isolates the transactional boundary to the agent itself, avoiding contention on the Jenkins controller and providing resilience against transient network failures to the InfluxDB endpoint. It’s a pragmatic compromise, providing the data integrity we need without the operational overhead of a distributed transaction coordinator or a message bus.

Let’s walk through the build log of this implementation. The system consists of four main components:

  1. Vitest Configuration: A custom reporter to output performance data in a structured JSON format.
  2. Jenkins Shared Library: The heart of the solution, containing the transactional logic written in Groovy.
  3. The Jenkinsfile: The pipeline definition that orchestrates the build, test, and metric collection steps.
  4. InfluxDB Instance: The time-series database for storage and analysis.

The Foundation: Reliable Metric Generation with a Custom Vitest Reporter

Before any data can be ingested, it must be generated reliably. Vitest’s built-in reporters are excellent for human consumption, but for automated processing, a stable machine-readable format is non-negotiable. We created a simple custom reporter that hooks into Vitest’s lifecycle and produces a clean JSON file containing only the essential data points.

Here is the Vitest configuration (vitest.config.ts) that enables this custom reporter:

// vitest.config.ts
import { defineConfig } from 'vitest/config';
import { resolve }_from 'path';

export default defineConfig({
  test: {
    // The custom reporter is specified here. The path is relative to the project root.
    reporters: ['./ci/reporters/json-performance-reporter.js'],
    // We must specify a file path for the output, otherwise it goes to stdout.
    outputFile: './vitest-output/performance-report.json',
  },
});

The reporter itself (ci/reporters/json-performance-reporter.js) is a JavaScript module that implements Vitest’s Reporter API. Its sole job is to distill the complex test results into a simple structure containing file paths and test durations.

// ci/reporters/json-performance-reporter.js

// This reporter is designed for simplicity and machine-readability.
// It avoids the verbosity of the default JSON reporter.
class JsonPerformanceReporter {
  constructor(options) {
    this.options = options;
    this.results = {
      // We capture metadata to ensure the report is self-contained.
      startTime: 0,
      endTime: 0,
      totalDuration: 0,
      files: [],
    };
  }

  onRunStart(files) {
    this.results.startTime = Date.now();
  }

  onRunComplete(files, results) {
    this.results.endTime = Date.now();
    this.results.totalDuration = this.results.endTime - this.results.startTime;
  }
  
  // This is the core method. It's called for each test file.
  onTestResult(file, result) {
    if (!result || !result.duration) {
        return;
    }

    // We only care about file-level performance for this use case.
    // Granular test-case level data would bloat the payload significantly.
    this.results.files.push({
      path: file.name,
      duration: result.duration, // Duration in milliseconds
      passed: result.state === 'pass',
    });
  }

  // Vitest calls this method to get the final output.
  // We serialize our results object into the specified outputFile.
  async onFinished(files, errors) {
    // The Vitest config handles writing this to a file via the 'outputFile' option.
  }

  // This is a new lifecycle hook in recent Vitest versions.
  // We must return the final collected results here so Vitest can write it.
  getCollectedResult() {
    return this.results;
  }
}

// Vitest requires the reporter to be exported as a default.
export default JsonPerformanceReporter;

With this setup, every npm test run generates a predictable performance-report.json file. This file is the primary artifact our Jenkins pipeline will consume.

The Core Logic: A Jenkins Shared Library for Transactional Ingestion

This is where the ACID-like properties are enforced. We created a Jenkins Shared Library to encapsulate the entire process. This makes the logic reusable across hundreds of Jenkins jobs and centralizes maintenance.

The library has the following structure:

.
├── src
│   └── org
│       └── mycompany
│           └── ci
│               └── InfluxDBPusher.groovy  // Handles HTTP communication
└── vars
    └── ingestVitestMetrics.groovy       // The main pipeline function with transactional logic

ingestVitestMetrics.groovy: The Transactional Workflow

This script defines the global variable ingestVitestMetrics that can be called from any Jenkinsfile. It simulates a transaction using the agent’s local filesystem as a staging area.

The workflow is as follows:

  1. BEGIN: Create a unique, temporary staging directory on the agent. Create a .lock file to prevent concurrent access issues if multiple jobs were to run in the same workspace (a bad practice, but this adds a layer of safety).
  2. PROCESS: Read and parse the Vitest JSON report. Transform the data into InfluxDB’s Line Protocol format, which is highly efficient for ingestion.
  3. COMMIT: Write the formatted Line Protocol data to a file (metrics.lp) within the staging directory. If and only if this write is successful, delete the .lock file and signal that the transaction is ready for push.
  4. PUSH (Asynchronous): The actual HTTP push to InfluxDB is performed by a separate helper class. This step is designed to be retryable.
  5. ROLLBACK: If any error occurs during the PROCESS or COMMIT stages, the entire staging directory is deleted, and the pipeline fails loudly.

Here is the full implementation of vars/ingestVitestMetrics.groovy:

// vars/ingestVitestMetrics.groovy
import groovy.json.JsonSlurper
import org.mycompany.ci.InfluxDBPusher

/**
 * Ingests Vitest performance metrics into InfluxDB with transactional guarantees.
 *
 * @param config A map containing configuration details:
 *               - vitestReportPath (String): Path to the Vitest JSON report.
 *               - influxDbUrl (String): The URL of the InfluxDB instance.
 *               - influxDbCredentialsId (String): Jenkins credentials ID for the InfluxDB token.
 *               - influxDbOrg (String): The InfluxDB organization.
 *               - influxDbBucket (String): The InfluxDB bucket.
 *               - measurementName (String): The InfluxDB measurement name (e.g., 'vitest_performance').
 *               - buildTags (Map): A map of tags to add to every data point (e.g., [jobName: 'my-app', gitCommit: 'abc1234']).
 */
def call(Map config) {
    // A real-world project must validate the config map for required keys.
    // For brevity, we are omitting that here.

    def stagingDir = "${env.WORKSPACE}/.ci_ingestion_staging_${env.BUILD_ID}"
    def lockFile = "${stagingDir}/.lock"
    def payloadFile = "${stagingDir}/metrics.lp"

    // Use a try-finally block to ensure cleanup (rollback) on failure.
    try {
        // == TRANSACTION BEGIN ==
        echo "[Metrics Ingestion] Starting transaction. Staging directory: ${stagingDir}"
        sh "mkdir -p ${stagingDir}"
        
        // The lock file prevents race conditions, although Jenkins workspaces should be isolated.
        // It serves as a clear signal that a transaction is in progress.
        sh "touch ${lockFile}"

        // == PROCESS ==
        echo "[Metrics Ingestion] Reading Vitest report from ${config.vitestReportPath}"
        if (!fileExists(config.vitestReportPath)) {
            error "[Metrics Ingestion] Vitest report not found at path: ${config.vitestReportPath}"
        }
        def reportContent = readFile(config.vitestReportPath)
        def reportJson = new JsonSlurper().parseText(reportContent)

        // Generate the InfluxDB Line Protocol payload
        def lineProtocolPayload = generateLineProtocol(reportJson, config)

        if (lineProtocolPayload.isEmpty()) {
            echo "[Metrics Ingestion] No metric data generated. Skipping ingestion."
            // This is a successful empty transaction. We just clean up.
            return
        }

        // == COMMIT ==
        // Atomically commit the data to the staging file on the agent's filesystem.
        // This is the point of no return for the transaction on the agent side.
        echo "[Metrics Ingestion] Committing ${lineProtocolPayload.size()} lines to staging file."
        writeFile(file: payloadFile, text: lineProtocolPayload.join('\n'))

        // If we get here, the local commit was successful.
        // Now we can attempt the remote push.
        echo "[Metrics Ingestion] Local commit successful. Preparing to push to InfluxDB."

        def pusher = new InfluxDBPusher(this) // Pass the script context for Jenkins steps
        pusher.push(
            payloadFile,
            config.influxDbUrl,
            config.influxDbCredentialsId,
            config.influxDbOrg,
            config.influxDbBucket
        )

    } catch (e) {
        // == ROLLBACK ==
        // Any exception triggers a full rollback.
        echo "[Metrics Ingestion] ERROR: Transaction failed. Rolling back."
        sh "rm -rf ${stagingDir}"
        error "Metric ingestion failed: ${e.toString()}"
    } finally {
        // Final cleanup ensures the staging directory is removed on success.
        if (fileExists(stagingDir)) {
            echo "[Metrics Ingestion] Transaction complete. Cleaning up staging directory."
            sh "rm -rf ${stagingDir}"
        }
    }
}

/**
 * Transforms the parsed Vitest JSON into InfluxDB Line Protocol.
 */
private List<String> generateLineProtocol(Map reportJson, Map config) {
    def lines = []
    def measurement = config.measurementName
    long timestamp = reportJson.startTime * 1_000_000 // InfluxDB needs nanoseconds

    // Convert buildTags map to a comma-separated string for Line Protocol
    def tagSet = config.buildTags.collect { key, value -> "${key}=${value}" }.join(',')

    // 1. A single point for the overall test run summary
    lines.add("${measurement},${tagSet},type=summary total_duration=${reportJson.totalDuration}i ${timestamp}")
    
    // 2. A point for each test file
    reportJson.files.eachWithIndex { file, i ->
        def fileTagSet = "file_path=${escapeTagValue(file.path)}"
        def finalTags = "${tagSet},${fileTagSet}"
        
        // Add a small nanosecond offset to ensure timestamps are unique if start time is the same
        long fileTimestamp = timestamp + (i + 1)

        lines.add("${measurement},${finalTags},type=file duration=${file.duration} ${fileTimestamp}")
    }
    
    return lines
}

// InfluxDB Line Protocol requires escaping for certain characters in tag values.
private String escapeTagValue(String value) {
    return value.replace('=', '\\=').replace(',', '\\,').replace(' ', '\\ ')
}

InfluxDBPusher.groovy: The Resilient HTTP Client

This helper class is responsible for the single task of pushing the staged data to InfluxDB. A common pitfall here is failing to implement retries. Network glitches are a fact of life in distributed systems, and a CI pipeline is no exception. This class uses Jenkins’ retry step to handle transient failures.

// src/org/mycompany/ci/InfluxDBPusher.groovy
package org.mycompany.ci

class InfluxDBPusher implements Serializable {
    private static final long serialVersionUID = 1L

    // We need the script context to call built-in Jenkins steps like 'withCredentials' and 'sh'
    private def script

    InfluxDBPusher(script) {
        this.script = script
    }

    /**
     * Pushes the Line Protocol payload from a file to InfluxDB with retries.
     */
    void push(String payloadFilePath, String url, String credentialsId, String org, String bucket) {
        script.withCredentials([script.string(credentialsId: credentialsId, variable: 'INFLUX_TOKEN')]) {
            // A retry block is crucial for resilience against transient network issues.
            script.retry(3) {
                try {
                    echo "[InfluxDBPusher] Attempting to push metrics to InfluxDB..."
                    
                    // The 'sh' step is used to execute curl.
                    // Using single quotes for the Groovy string to avoid variable interpolation issues.
                    // The '--data-binary' reads the content from the file, avoiding shell command length limits.
                    def response = script.sh(
                        script: """
                            curl --request POST \
                            '${url}/api/v2/write?org=${org}&bucket=${bucket}&precision=ns' \
                            --header 'Authorization: Token ${script.env.INFLUX_TOKEN}' \
                            --header 'Content-Type: text/plain; charset=utf-8' \
                            --data-binary '@${payloadFilePath}' \
                            --write-out '%{http_code}' --silent --output /dev/null
                        """,
                        returnStdout: true
                    ).trim()

                    echo "[InfluxDBPusher] Received HTTP status code: ${response}"
                    if (response != "204") {
                        // InfluxDB returns 204 No Content on success. Any other code is an error.
                        throw new Exception("Failed to push to InfluxDB. Received HTTP status ${response}")
                    }
                    echo "[InfluxDBPusher] Successfully pushed metrics to InfluxDB."
                } catch (Exception e) {
                    echo "[InfluxDBPusher] Push attempt failed. Retrying... Error: ${e.message}"
                    // The retry block will catch this and re-throw if all attempts fail
                    throw e
                }
            }
        }
    }
}

Orchestration: The Jenkinsfile

With the shared library in place, the Jenkinsfile becomes remarkably clean and declarative. Its responsibility is limited to defining the stages of the pipeline and calling the shared library with the correct configuration.

// Jenkinsfile

// Import the shared library
@Library('my-shared-library@main') _

pipeline {
    agent any

    environment {
        // It's a best practice to manage these via Jenkins config or a config file.
        INFLUXDB_URL = 'http://influxdb:8086'
        INFLUXDB_CREDENTIALS_ID = 'influxdb-token'
        INFLUXDB_ORG = 'my-org'
        INFLUXDB_BUCKET = 'ci-metrics'
    }

    stages {
        stage('Checkout') {
            steps {
                checkout scm
            }
        }

        stage('Install Dependencies') {
            steps {
                sh 'npm install'
            }
        }

        stage('Run Tests & Generate Report') {
            steps {
                // This command runs Vitest, which will generate the performance-report.json
                sh 'npm test'
            }
        }

        stage('Ingest Performance Metrics') {
            steps {
                script {
                    // This is the call to our transactional ingestion function.
                    ingestVitestMetrics(
                        vitestReportPath: 'vitest-output/performance-report.json',
                        influxDbUrl: env.INFLUXDB_URL,
                        influxDbCredentialsId: env.INFLUXDB_CREDENTIALS_ID,
                        influxDbOrg: env.INFLUXDB_ORG,
                        influxDbBucket: env.INFLUXDB_BUCKET,
                        measurementName: 'vitest_performance',
                        buildTags: [
                            jobName: env.JOB_NAME,
                            buildNumber: env.BUILD_NUMBER,
                            gitCommit: sh(script: 'git rev-parse --short HEAD', returnStdout: true).trim(),
                            gitBranch: env.BRANCH_NAME
                        ]
                    )
                }
            }
        }
    }
}

Visualizing the Data Flow

The entire process can be visualized as a sequence of state changes, ensuring data integrity at each step.

graph TD
    subgraph Jenkins Agent Workspace
        A[Stage: Run Tests] -- generates --> B(performance-report.json);
        B -- consumed by --> C{ingestVitestMetrics};
        
        subgraph Transactional Boundary
            C -- BEGIN --> D(Create .ci_ingestion_staging/);
            D -- create --> E(.lock file);
            B -- PROCESS --> F[Transform to Line Protocol];
            F -- COMMIT --> G(Write metrics.lp);
            G -- on success --> H(Delete .lock file);
        end

        C -- on any failure --> I(ROLLBACK: Delete .ci_ingestion_staging/);
        H -- triggers async push --> J[InfluxDBPusher.push];
    end

    subgraph Network
        J -- HTTP POST with retries --> K[InfluxDB API];
    end
    
    K -- 204 No Content --> L[Data Persisted];

    style Transactional Boundary fill:#f9f,stroke:#333,stroke-width:2px

Verifying the Results in InfluxDB

Once the data is ingested, we can query it using InfluxDB’s Flux language. For example, to find the average duration of a specific test file across all builds on the main branch, the query would look like this:

// Query to track the performance of a single test file over time
from(bucket: "ci-metrics")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "vitest_performance")
  |> filter(fn: (r) => r.type == "file")
  |> filter(fn: (r) => r.gitBranch == "main")
  |> filter(fn: (r) => r.file_path == "src/components/ComplexComponent.test.tsx")
  |> filter(fn: (r) => r._field == "duration")
  |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
  |> yield(name: "mean_duration")

This query gives us a powerful way to detect performance regressions automatically, something that was impossible with our previous, unreliable data collection method.

The file-based transactional staging on the Jenkins agent is a pragmatic solution. It’s not a distributed, two-phase commit protocol, nor does it need to be. Its primary limitation is that it’s tied to the lifecycle and filesystem of a single agent. If the agent itself is terminated abruptly mid-transaction (between COMMIT and PUSH), the staged data would be lost. For our use case, where CI jobs are relatively short-lived and agent failures are rare, this is an acceptable trade-off. A future iteration for a more mission-critical system might involve replacing the local file staging with a write to a durable message queue like RabbitMQ or a transactional outbox table in a small SQLite database on the agent, further decoupling the data collection from the push mechanism and providing durability across agent restarts.


  TOC