Implementing a Real-Time Code Quality Observability Pipeline with ActiveMQ and Meilisearch


The inability to query our codebase’s health in real-time was becoming a significant drag on engineering efficiency. With over 300 active microservices, CI pipeline logs were a transient, decentralized mess. Answering a seemingly simple question like “Which repositories have the highest number of react/no-unstable-nested-components violations introduced in the past quarter?” was impossible without a coordinated, manual effort of grepping through archived logs. This wasn’t a sustainable model. Our initial thought of piping all CI logs into a central Elasticsearch cluster was dismissed due to the operational overhead and the fact that we didn’t need the heavyweight aggregation capabilities of the ELK stack. We needed a lightweight, fast, and event-driven solution focused purely on search.

The core concept settled on an asynchronous pipeline. A git push would trigger a webhook, which would queue a message containing the raw ESLint report. A dedicated consumer would then process this report, transform the data into a search-friendly format, and index it. This decoupled the CI process from the indexing logic, ensuring that a failure in the observability pipeline would not block a developer’s workflow.

For the messaging backbone, we chose ActiveMQ Artemis. The primary reason was pragmatic: we already operated a stable Artemis cluster for other internal systems. Introducing a new technology like Kafka or RabbitMQ would have incurred an unnecessary operational and learning cost. ActiveMQ provided the reliability, message durability, and support for dead-letter queues (DLQs) that were essential for a production-grade system. For the search component, we selected Meilisearch. Its primary advantages over competitors like Elasticsearch were its simplicity and out-of-the-box performance for our specific use case. We needed fast, prefix-based, typo-tolerant search on structured data, not complex data analysis. Meilisearch’s minimal configuration, low resource footprint, and developer-friendly API were a perfect fit. ESLint was the non-negotiable linter, given its deep integration into our JavaScript and TypeScript projects. The challenge was never about running ESLint, but about what to do with its output at scale.

Our final architecture materialized as a straightforward, robust flow.

graph TD
    A[Developer Git Push] --> B{CI Server};
    B --> C[Run ESLint --format=json];
    C --> D{Webhook Emitter};
    D -- JSON Payload --> E[API Gateway];
    E -- AMQP --> F[ActiveMQ Artemis Topic: linting.events];
    F --> G[Node.js Worker Pool];
    subgraph Worker Process
        G --> H[Consume Message];
        H --> I[Decode & Parse ESLint Report];
        I --> J[Transform Violations into Documents];
        J --> K[Batch Index to Meilisearch];
    end
    K --> L[Meilisearch Instance];
    M[Developer Dashboard] -- HTTP API --> L;

The process begins when the CI server generates an ESLint report in JSON format. A simple script then wraps this report into a larger JSON payload and sends it to an internal API Gateway, which authenticates the request and publishes it to an ActiveMQ topic. A pool of Node.js workers listens to this topic, processes the reports, and handles the indexing.

The message contract sent to ActiveMQ is critical for decoupling. We base64-encode the ESLint report to prevent JSON parsing issues within the message broker or transport layers.

Message Payload Example:

{
  "version": "1.0",
  "source": "ci-pipeline--service-A",
  "commitSha": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2",
  "repositoryName": "engineering/frontend-platform",
  "timestamp": "2023-10-27T10:30:00Z",
  "reportFormat": "eslint-json",
  "reportPayload": "WyB7ICJmaWxlUGF0aCI6ICIvYnVpbGQvcy9zcmMvY29tcG9uZW50cy9Vc2VyUHJvZmlsZS50c3giLCAiZXJyb3JDb3VudCI6IDEsICJ3YXJuaW5nQ291bnQiOiAwLCAiZml4YWJsZUVycm9yQ291bnQiOiAwLCAiZml4YWJsZVdhcm5pbmdDb3VudCI6IDAsICJtZXNzYWdlcyI6IFsgeyJydWxlSWQiOiAibm8tZXhwbGljaXQtYW55IiwgInNldmVyaXR5IjogMiwgIm1lc3NhZ2UiOiAiVW5leHBlY3RlZCBhbnkuIiwgImxpbmUiOiA0MiwgImNvbHVtbiI6IDEwLCBub2RlVHlwZTogIlRTS2V5d29yZFR5cGUifSBdIH1dIF0="
}

On the ActiveMQ side, setting up a durable queue with a corresponding dead-letter queue is a matter of configuration in broker.xml. This ensures that if a worker repeatedly fails to process a message, it gets moved aside for manual inspection instead of being lost or causing an infinite consumption loop.

ActiveMQ broker.xml Snippet:

<core xmlns="urn:activemq:core">
    <address-settings>
        <!-- Default DLQ and expiry settings -->
        <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
        </address-setting>
    </address-settings>

    <addresses>
        <!-- Address for our linting events topic -->
        <address name="linting.events">
            <multicast/>
        </address>
        <!-- A specific queue for our worker group -->
        <address name="code-quality.workers">
            <anycast>
                <queue name="code-quality.workers.queue"/>
            </anycast>
        </address>
        <!-- Standard DLQ and ExpiryQueue -->
        <address name="DLQ">
            <anycast>
                <queue name="DLQ"/>
            </anycast>
        </address>
        <address name="ExpiryQueue">
            <anycast>
                <queue name="ExpiryQueue"/>
            </anycast>
        </address>
    </addresses>
</core>

In this setup, messages are published to the linting.events topic. Our worker subscribes to a dedicated anycast queue code-quality.workers.queue, which is bound to that topic. This pattern allows for multiple worker instances to load-balance message consumption.

The core of the implementation lies within the Node.js worker. It uses rhea-promise for AMQP 1.0 communication with ActiveMQ Artemis and the meilisearch SDK. A critical design decision was how to structure the data in Meilisearch. Indexing an entire file report as a single document would be inefficient for searching specific rule violations. Instead, we transform the report, creating one Meilisearch document for each individual linting message.

This is the main worker script. It handles connection, message consumption, data transformation, and batch indexing.

worker.js

import { Connection, Receiver, Message } from 'rhea-promise';
import { MeiliSearch } from 'meilisearch';
import { v4 as uuidv4 } from 'uuid';
import pino from 'pino';

// --- Configuration ---
// In a real project, these would come from environment variables.
const AMQP_CONNECTION_STRING = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672';
const AMQP_QUEUE_NAME = process.env.AMQP_QUEUE || 'code-quality.workers.queue';
const MEILI_HOST = process.env.MEILI_HOST || 'http://127.0.0.1:7700';
const MEILI_MASTER_KEY = process.env.MEILI_MASTER_KEY || 'aSampleMasterKey';
const MEILI_INDEX_NAME = 'linting_violations';
const BATCH_SIZE = 1000; // Number of documents to send to Meilisearch at once.

const logger = pino({ level: 'info' });

// --- Meilisearch Client Initialization ---
const meiliClient = new MeiliSearch({
  host: MEILI_HOST,
  apiKey: MEILI_MASTER_KEY,
});

/**
 * Transforms a raw ESLint JSON report into an array of documents for Meilisearch.
 * Each document represents a single linting violation.
 *
 * @param {object} rawMessage - The message received from ActiveMQ.
 * @returns {Array<object>} An array of documents ready for indexing.
 */
function transformReportToDocuments(rawMessage) {
  const { commitSha, repositoryName, timestamp } = rawMessage;

  if (!rawMessage.reportPayload) {
    logger.warn({ msg: 'Message missing reportPayload', commitSha, repositoryName });
    return [];
  }

  let eslintReport;
  try {
    const decodedPayload = Buffer.from(rawMessage.reportPayload, 'base64').toString('utf-8');
    eslintReport = JSON.parse(decodedPayload);
  } catch (error) {
    logger.error({ msg: 'Failed to decode or parse ESLint report', err: error.message, commitSha });
    throw new Error('Invalid report payload'); // This will cause the message to be rejected.
  }

  const documents = [];
  for (const fileReport of eslintReport) {
    if (!fileReport.messages || fileReport.messages.length === 0) {
      continue;
    }

    const filePath = fileReport.filePath.startsWith('/builds/')
      ? fileReport.filePath.substring(fileReport.filePath.indexOf('/', 1) + 1) // Clean runner-specific paths
      : fileReport.filePath;


    for (const message of fileReport.messages) {
      const document = {
        // Use a unique ID to allow for potential re-indexing of the same commit.
        id: uuidv4(),
        commitSha,
        repositoryName,
        // Meilisearch works best with UNIX timestamps for sorting/filtering.
        timestamp: Math.floor(new Date(timestamp).getTime() / 1000),
        filePath,
        ruleId: message.ruleId || 'unknown',
        severity: message.severity, // 1 for warning, 2 for error
        message: message.message,
        line: message.line,
        column: message.column,
        // A concatenated field for easier free-text search
        searchableContent: `${message.ruleId || ''} ${message.message}`
      };
      documents.push(document);
    }
  }

  return documents;
}

/**
 * The main message processing handler.
 * @param {import('rhea-promise').Context} context - The receiver context.
 */
async function messageHandler(context) {
  const rawMessage = context.message.body;
  logger.info({ msg: 'Received message', repo: rawMessage.repositoryName, commit: rawMessage.commitSha });

  try {
    const documents = transformReportToDocuments(rawMessage);

    if (documents.length === 0) {
      logger.info({ msg: 'No linting violations found in report, acknowledging message.', commit: rawMessage.commitSha });
      context.delivery.accept();
      return;
    }

    logger.info(`Transformed report into ${documents.length} documents. Indexing in batches of ${BATCH_SIZE}.`);

    // Index documents in batches to avoid overwhelming Meilisearch.
    for (let i = 0; i < documents.length; i += BATCH_SIZE) {
      const batch = documents.slice(i, i + BATCH_SIZE);
      const task = await meiliClient.index(MEILI_INDEX_NAME).addDocuments(batch);
      logger.info({ msg: 'Submitted batch to Meilisearch', taskUid: task.taskUid, batchSize: batch.length });
    }

    // Acknowledge the message only after all batches are successfully submitted.
    context.delivery.accept();
    logger.info({ msg: 'Successfully processed and indexed message.', commit: rawMessage.commitSha });

  } catch (error) {
    logger.error({ msg: 'Error processing message', err: error.message, commit: rawMessage.commitSha, stack: error.stack });
    // Reject the message, which will send it to the DLQ after retries are exhausted.
    context.delivery.release({ delivery_failed: true });
  }
}

/**
 * Sets up the Meilisearch index with appropriate settings.
 * This is an idempotent operation.
 */
async function setupMeiliIndex() {
  try {
    logger.info('Ensuring Meilisearch index exists and is configured...');
    await meiliClient.createIndex(MEILI_INDEX_NAME, { primaryKey: 'id' });
    const settings = {
      searchableAttributes: [
        'searchableContent',
        'ruleId',
        'filePath',
        'repositoryName',
        'commitSha'
      ],
      filterableAttributes: [
        'repositoryName',
        'ruleId',
        'severity',
        'timestamp',
        'filePath'
      ],
      sortableAttributes: [
        'timestamp'
      ],
      rankingRules: [
        'words',
        'typo',
        'proximity',
        'attribute',
        'sort',
        'exactness',
        'timestamp:desc'
      ]
    };
    const task = await meiliClient.index(MEILI_INDEX_NAME).updateSettings(settings);
    await meiliClient.waitForTask(task.taskUid);
    logger.info('Meilisearch index configured successfully.');
  } catch(error) {
      // It's okay if the index already exists. The `updateSettings` call handles the rest.
      if (error.code !== 'index_already_exists') {
          throw error;
      }
      logger.warn('Index already exists. Proceeding to update settings.');
      // Still attempt to update settings in case they have changed
      const task = await meiliClient.index(MEILI_INDEX_NAME).updateSettings(settings.rankingRules);
      await meiliClient.waitForTask(task.taskUid);
      logger.info('Meilisearch settings updated successfully.');
  }
}

/**
 * Main application entry point.
 */
async function main() {
  await setupMeiliIndex();

  const connection = new Connection({
      connection_details: {
          host: AMQP_CONNECTION_STRING,
          reconnect: true,
          reconnect_limit: 100, // Try to reconnect many times
          initial_reconnect_delay: 1000,
      }
  });

  connection.on('connection_open', () => logger.info('Successfully connected to ActiveMQ.'));
  connection.on('connection_error', (err) => logger.error({ msg: 'ActiveMQ connection error', err }));
  connection.on('disconnected', () => logger.warn('Disconnected from ActiveMQ. Will attempt to reconnect.'));


  await connection.open();

  const receiver = await connection.createReceiver({
    source: AMQP_QUEUE_NAME,
    // Automatically settle messages; we do it manually in the handler
    autoaccept: false, 
    // Number of messages to buffer locally
    credit_window: 10 
  });

  receiver.on('message', messageHandler);
  receiver.on('receiver_error', (err) => logger.error({ msg: 'Receiver error', err }));

  logger.info(`Worker started. Listening for messages on queue: ${AMQP_QUEUE_NAME}`);
}

main().catch(error => {
  logger.fatal({ msg: 'Worker failed to start', err: error.message, stack: error.stack });
  process.exit(1);
});

This worker is designed for resilience. It performs batching to handle large reports efficiently, includes structured logging, and has robust error handling that leverages the DLQ functionality of ActiveMQ. The setupMeiliIndex function is crucial; it defines which fields can be searched, filtered, and sorted, which directly impacts query performance and capabilities. In a production environment, this configuration would be managed via infrastructure-as-code.

A common pitfall in such systems is improper testing. A unit test for the transformReportToDocuments function is non-negotiable.

worker.test.js (Jest Example)

import { jest } from '@jest/globals';

// Mock the uuidv4 function to get deterministic IDs
jest.unstable_mockModule('uuid', () => ({
  v4: () => 'mock-uuid-1234',
}));

// We need to dynamically import the worker after setting up the mock
const { transformReportToDocuments } = await import('./worker.js');

describe('transformReportToDocuments', () => {
  const baseMessage = {
    commitSha: 'a1b2c3d4',
    repositoryName: 'test/repo',
    timestamp: '2023-10-27T12:00:00Z',
  };

  it('should transform a valid ESLint report into multiple documents', () => {
    const eslintReport = [
      {
        filePath: '/builds/group/project/src/component.js',
        messages: [
          { ruleId: 'no-console', severity: 1, message: 'Do not use console.log', line: 10, column: 5 },
          { ruleId: 'no-unused-vars', severity: 2, message: "'x' is defined but never used.", line: 5, column: 7 },
        ],
      },
      {
        filePath: '/builds/group/project/src/api/client.js',
        messages: [
          { ruleId: 'no-explicit-any', severity: 2, message: 'Unexpected any.', line: 25, column: 15 },
        ],
      },
      {
        filePath: '/builds/group/project/src/utils.js', // File with no issues
        messages: [],
      },
    ];

    const message = {
      ...baseMessage,
      reportPayload: Buffer.from(JSON.stringify(eslintReport)).toString('base64'),
    };

    const documents = transformReportToDocuments(message);

    expect(documents).toHaveLength(3);
    expect(documents[0]).toEqual({
      id: 'mock-uuid-1234',
      commitSha: 'a1b2c3d4',
      repositoryName: 'test/repo',
      timestamp: 1698393600,
      filePath: 'group/project/src/component.js', // Path is cleaned
      ruleId: 'no-console',
      severity: 1,
      message: 'Do not use console.log',
      line: 10,
      column: 5,
      searchableContent: 'no-console Do not use console.log'
    });
    expect(documents[2].ruleId).toBe('no-explicit-any');
  });

  it('should return an empty array for a report with no violations', () => {
    const eslintReport = [ { filePath: 'src/index.js', messages: [] } ];
    const message = {
      ...baseMessage,
      reportPayload: Buffer.from(JSON.stringify(eslintReport)).toString('base64'),
    };
    const documents = transformReportToDocuments(message);
    expect(documents).toHaveLength(0);
  });

  it('should throw an error for malformed JSON payload', () => {
    const message = {
      ...baseMessage,
      reportPayload: Buffer.from('{ not json }').toString('base64'),
    };
    expect(() => transformReportToDocuments(message)).toThrow('Invalid report payload');
  });
});

With the pipeline running, we can now perform powerful, real-time queries against the Meilisearch API.

Find all critical (severity: 2) violations of no-explicit-any in the frontend-platform repository:

curl -X POST 'http://127.0.0.1:7700/indexes/linting_violations/search' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer aSampleMasterKey' \
--data-binary '{
    "q": "no-explicit-any",
    "filter": [
        "repositoryName = 'engineering/frontend-platform'",
        "severity = 2"
    ]
}'

Find any violations introduced in the last 24 hours across all repositories, sorted by most recent:

#!/bin/bash
current_ts=$(date +%s)
ts_24h_ago=$((current_ts - 86400))

curl -X POST 'http://127.0.0.1:7700/indexes/linting_violations/search' \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer aSampleMasterKey" \
--data-binary "{
    \"filter\": \"timestamp > $ts_24h_ago\",
    \"sort\": [\"timestamp:desc\"]
}"

This system provides the immediate, queryable insight we were lacking. It allows us to build dashboards to track code quality trends, identify repositories that need architectural attention, and give developers a tool to find examples of specific errors across the entire organization’s codebase.

The current implementation, however, is not without its limitations. It only captures data from new commits forward; a significant effort would be required to design and execute a backfill job to process and index the git history of our most critical repositories. The worker is a single point of failure within its container; while multiple instances can be run for scalability, this requires a proper container orchestration platform like Kubernetes to manage health checks and restarts. Furthermore, the Meilisearch instance is a single node. While performant, it lacks high availability. For a mission-critical system, a future iteration would need to consider Meilisearch’s experimental high-availability features or an alternative architecture that can tolerate brief search outages during maintenance.


  TOC