Our initial integration tests for the new inventory management service were a disaster. They were a textbook example of non-determinism, a plague on any CI/CD pipeline. One run, all green. The next, a cascade of red, with no code changes. The service is built on a CQRS pattern, with Apache Pulsar acting as the asynchronous command bus and event backbone. The flakiness stemmed from a fundamental misunderstanding of what we needed to test. We weren’t just testing business logic; we were testing the behavior of a distributed system, and our tests were completely unequipped for that reality.
The initial pain point was the classic race condition. A test would dispatch a command to Pulsar, then immediately query the read model for the updated state. Sometimes the projection had finished its work; sometimes it hadn’t. The knee-jerk reaction was to sprinkle setTimeout
calls throughout the tests. This is a cardinal sin in automated testing. It doesn’t fix the race condition; it just makes it less likely to occur, trading outright failure for insidious flakiness.
Our second, more critical failure was conceptual. Our unit tests for the command handlers were pristine. They confirmed that given a valid command, the handler would produce the correct event. But this is a sterile environment. In a real-world project using a message broker like Pulsar with at-least-once delivery guarantees, the real danger isn’t that a command won’t be processed. It’s that it might be processed multiple times. Our unit tests, mocking the Pulsar client, gave us a false sense of security. They couldn’t tell us if our system would correctly handle a duplicate command message, which is a guaranteed eventuality in production. We needed to test for idempotency, and that meant we needed to test against a real broker.
The Shift to Container-Based Integration Testing
The decision was made to abandon mocks for the messaging layer. To achieve test isolation and determinism, we turned to testcontainers-node
. This allows us to programmatically spin up a genuine Pulsar instance in a Docker container for the duration of our test suite. Every test run starts with a clean, ephemeral broker, eliminating state leakage between runs.
Here is the initial Jest global setup file that orchestrates this.
jest.global-setup.js
const { PulsarContainer } = require('@testcontainers/pulsar');
const { GenericContainer } = require('@testcontainers/testcontainers');
const path = require('path');
const fs = require('fs');
module.exports = async () => {
console.log('\nSetting up Pulsar container for integration tests...');
try {
// We need a Zookeeper instance for Pulsar's metadata storage.
const zookeeperContainer = await new GenericContainer('zookeeper:3.8')
.withExposedPorts(2181)
.withNetworkAliases('zookeeper')
.start();
const zookeeperHost = zookeeperContainer.getHost();
const zookeeperPort = zookeeperContainer.getMappedPort(2181);
const pulsarContainer = await new PulsarContainer()
.withEnvironment({
'zookeeperServers': `${zookeeperHost}:${zookeeperPort}`,
'PULSAR_MEM': '"-Xms512m -Xmx512m"',
})
.withNetwork(zookeeperContainer.getNetwork())
.withExposedPorts(6650, 8080)
.start();
// Store container details in a global variable for test suites to access.
global.__PULSAR_CONTAINER__ = pulsarContainer;
global.__ZOOKEEPER_CONTAINER__ = zookeeperContainer;
// We also write the connection details to a temp file.
// This allows our application code within the tests to know how to connect.
const connectionDetails = {
serviceUrl: pulsarContainer.getBrokerUrl(),
webUrl: `http://${pulsarContainer.getHost()}:${pulsarContainer.getMappedPort(8080)}`,
};
const configPath = path.join(__dirname, 'test-pulsar-config.json');
fs.writeFileSync(configPath, JSON.stringify(connectionDetails));
console.log(`Pulsar is ready. Service URL: ${connectionDetails.serviceUrl}`);
} catch (error) {
console.error('Failed to start Pulsar container:', error);
process.exit(1);
}
};
And the corresponding teardown file to ensure cleanup:
jest.global-teardown.js
const fs = require('fs');
const path = require('path');
module.exports = async () => {
console.log('\nTearing down Pulsar container...');
if (global.__PULSAR_CONTAINER__) {
await global.__PULSAR_CONTAINER__.stop();
}
if (global.__ZOOKEEPER_CONTAINER__) {
await global.__ZOOKEEPER_CONTAINER__.stop();
}
const configPath = path.join(__dirname, 'test-pulsar-config.json');
if (fs.existsSync(configPath)) {
fs.unlinkSync(configPath);
}
console.log('Teardown complete.');
};
We configure Jest to use these files in jest.config.js
:
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
globalSetup: '<rootDir>/tests/jest.global-setup.js',
globalTeardown: '<rootDir>/tests/jest.global-teardown.js',
testTimeout: 60000, // Give containers time to start
};
This setup provides a solid foundation. Our tests now run against a real Pulsar broker, but it exposes the next layer of problems: how to write assertions for an asynchronous system without resorting to flaky timers.
A CQRS System Under Test
Let’s define the core components of our system. It’s a simple inventory service. The business logic is trivial, but the architectural pattern is what matters.
The Command and Event:src/domain/types.ts
import { v4 as uuidv4 } from 'uuid';
// All commands must have a unique ID for idempotency tracking
export interface Command {
commandId: string;
aggregateId: string;
type: string;
payload: any;
}
export interface AdjustInventoryCommand extends Command {
type: 'ADJUST_INVENTORY';
payload: {
productId: string;
quantityChange: number;
reason: string;
};
}
// Events capture what has happened
export interface Event {
eventId: string;
aggregateId: string;
type: string;
payload: any;
timestamp: string;
}
export const createInventoryAdjustedEvent = (
command: AdjustInventoryCommand
): Event => ({
eventId: uuidv4(),
aggregateId: command.aggregateId,
type: 'INVENTORY_ADJUSTED',
payload: {
productId: command.payload.productId,
quantityChange: command.payload.quantityChange,
reason: command.payload.reason,
causationId: command.commandId, // Link event back to the command
},
timestamp: new Date().toISOString(),
});
The Command Handler:
This is where the core logic and the critical idempotency check reside. A common mistake is to perform the check and the business logic in a non-atomic way. Here, we use a simple in-memory store to track processed command IDs. In a production system, this would be a persistent, distributed cache like Redis or a dedicated database table.
src/application/command-handler.ts
import Pulsar from 'pulsar-client';
import { Command, AdjustInventoryCommand, createInventoryAdjustedEvent, Event } from '../domain/types';
import { IIdempotencyStore } from './idempotency-store';
import { ILogger } from '../infrastructure/logger';
export class InventoryCommandHandler {
constructor(
private readonly pulsarClient: Pulsar.Client,
private readonly idempotencyStore: IIdempotencyStore,
private readonly logger: ILogger
) {}
public async handle(command: AdjustInventoryCommand): Promise<void> {
this.logger.info(`Handling command: ${command.commandId}`, { command });
// The core of idempotency: check if we've seen this command before.
const isDuplicate = await this.idempotencyStore.hasBeenProcessed(command.commandId);
if (isDuplicate) {
this.logger.warn(`Duplicate command detected and ignored: ${command.commandId}`);
// Acknowledge the message so Pulsar doesn't redeliver, but do nothing else.
return;
}
// Imagine complex business logic here...
// For this example, we just validate the input.
if (typeof command.payload.quantityChange !== 'number') {
throw new Error('Invalid quantityChange');
}
this.logger.info(`Business logic passed for command: ${command.commandId}`);
const event = createInventoryAdjustedEvent(command);
// Atomically publish the event and mark the command as processed.
// In a real system, you might use a transactional outbox pattern
// or rely on the atomicity of your database transaction.
// Here we simulate it by doing them sequentially. The risk is a crash
// between these two calls.
await this.publishEvent(event);
await this.idempotencyStore.markAsProcessed(command.commandId);
this.logger.info(`Event published for command: ${command.commandId}`, { eventId: event.eventId });
}
private async publishEvent(event: Event): Promise<void> {
const producer = await this.pulsarClient.createProducer({
topic: 'persistent://public/default/inventory-events',
});
await producer.send({
data: Buffer.from(JSON.stringify(event)),
partitionKey: event.aggregateId, // Ensure events for the same aggregate go to the same partition
});
await producer.close();
}
}
The Projection and Read Model:
The projection listens for events from Pulsar and updates a simple read model. The read model is just an in-memory map for this example, representing what a query service might use.
src/application/projection.ts
import Pulsar from 'pulsar-client';
import { Event } from '../domain/types';
import { ILogger } from '../infrastructure/logger';
// A simple in-memory key-value store for our read model.
export type ReadModel = Map<string, { productId: string; quantity: number }>;
export class InventoryProjection {
private consumer: Pulsar.Consumer | null = null;
constructor(
private readonly pulsarClient: Pulsar.Client,
private readonly readModel: ReadModel,
private readonly logger: ILogger
) {}
public async start(): Promise<void> {
this.consumer = await this.pulsarClient.subscribe({
topic: 'persistent://public/default/inventory-events',
subscription: 'inventory-projection-subscription',
subscriptionType: 'Key_Shared', // Allows multiple consumers to process messages for different keys in parallel
});
this.logger.info('Projection started and listening for events.');
while (this.consumer) {
try {
const msg = await this.consumer.receive();
const event = JSON.parse(msg.getData().toString()) as Event;
this.logger.info(`Received event: ${event.eventId}`, { type: event.type });
if (event.type === 'INVENTORY_ADJUSTED') {
this.applyInventoryAdjusted(event);
}
await this.consumer.acknowledge(msg);
} catch (error) {
// In a real app, we would need a dead-letter queue strategy.
this.logger.error('Error processing event in projection', { error });
}
}
}
private applyInventoryAdjusted(event: Event): void {
const { productId, quantityChange } = event.payload;
const current = this.readModel.get(productId) || { productId, quantity: 0 };
const newQuantity = current.quantity + quantityChange;
this.readModel.set(productId, { ...current, quantity: newQuantity });
this.logger.info(`Read model updated for product ${productId}. New quantity: ${newQuantity}`);
}
public async stop(): Promise<void> {
if (this.consumer) {
await this.consumer.close();
this.consumer = null;
this.logger.info('Projection stopped.');
}
}
}
Crafting a Deterministic Test Harness
With the components defined, we can build the test. The first challenge is avoiding setTimeout
. The solution is a polling mechanism. We create a helper function that repeatedly executes a check until it passes or a timeout is reached. This makes the test wait only as long as necessary.
tests/helpers/polling.ts
export const waitForCondition = async (
predicate: () => Promise<boolean> | boolean,
options: { timeout: number; interval: number }
): Promise<void> => {
const { timeout, interval } = options;
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
if (await predicate()) {
return;
}
await new Promise(resolve => setTimeout(resolve, interval));
}
throw new Error(`Condition not met within ${timeout}ms timeout.`);
};
Now, we can write our integration test. This test will orchestrate all the pieces: dispatching a command, running the projection, and querying the read model. Critically, it will dispatch the same command twice to verify our idempotency logic.
sequenceDiagram participant TestHarness as Test Harness participant CommandProducer as Pulsar Producer (Commands) participant CommandHandler as Command Handler participant IdempotencyStore as Idempotency Store participant EventProducer as Pulsar Producer (Events) participant Projection as Projection Consumer participant ReadModel as Read Model TestHarness->>CommandProducer: Send(AdjustInventoryCommand C1) CommandHandler->>Pulsar: Receive(C1) CommandHandler->>IdempotencyStore: hasBeenProcessed(C1.commandId)? IdempotencyStore-->>CommandHandler: false CommandHandler->>EventProducer: Send(InventoryAdjustedEvent E1) CommandHandler->>IdempotencyStore: markAsProcessed(C1.commandId) CommandHandler->>Pulsar: Ack(C1) Projection->>Pulsar: Receive(E1) Projection->>ReadModel: Update state based on E1 Projection->>Pulsar: Ack(E1) TestHarness->>CommandProducer: Send(AdjustInventoryCommand C1) (DUPLICATE) CommandHandler->>Pulsar: Receive(C1) CommandHandler->>IdempotencyStore: hasBeenProcessed(C1.commandId)? IdempotencyStore-->>CommandHandler: true Note right of CommandHandler: Ignores command, no event published CommandHandler->>Pulsar: Ack(C1) TestHarness->>ReadModel: Poll for expected state ReadModel-->>TestHarness: State reflects only one update TestHarness->>TestHarness: Assert Pass
tests/inventory.integration.test.ts
import Pulsar from 'pulsar-client';
import { v4 as uuidv4 } from 'uuid';
import { InventoryCommandHandler } from '../src/application/command-handler';
import { InventoryProjection, ReadModel } from '../src/application/projection';
import { AdjustInventoryCommand } from '../src/domain/types';
import { InMemoryIdempotencyStore } from '../src/infrastructure/idempotency-store';
import { ConsoleLogger, ILogger } from '../src/infrastructure/logger';
import { waitForCondition } from './helpers/polling';
describe('Inventory CQRS Integration Test', () => {
let pulsarClient: Pulsar.Client;
let commandHandler: InventoryCommandHandler;
let projection: InventoryProjection;
let readModel: ReadModel;
let idempotencyStore: InMemoryIdempotencyStore;
let logger: ILogger;
let commandProducer: Pulsar.Producer;
beforeAll(async () => {
// Read connection details from the file created by global setup
const config = require('./test-pulsar-config.json');
pulsarClient = new Pulsar.Client({ serviceUrl: config.serviceUrl });
commandProducer = await pulsarClient.createProducer({
topic: 'persistent://public/default/inventory-commands',
});
});
afterAll(async () => {
await commandProducer.close();
await pulsarClient.close();
});
beforeEach(async () => {
// Reset state for each test to ensure isolation
logger = new ConsoleLogger();
idempotencyStore = new InMemoryIdempotencyStore();
readModel = new Map();
// The command handler does not listen to a topic directly.
// In a real app, a consumer would pick up commands and pass them to the handler.
// For our test, we'll invoke it directly after sending a message to simulate this.
// This gives us more control.
commandHandler = new InventoryCommandHandler(pulsarClient, idempotencyStore, logger);
// The projection runs in the background, listening for events.
projection = new InventoryProjection(pulsarClient, readModel, logger);
projection.start(); // Start listening but don't await it
});
afterEach(async () => {
await projection.stop();
});
it('should process a command, update the read model, and ignore a duplicate command', async () => {
const productId = `product-${uuidv4()}`;
const aggregateId = productId;
// Step 1: Define the command. Crucially, we use the same command object for both sends.
const command: AdjustInventoryCommand = {
commandId: `cmd-${uuidv4()}`,
aggregateId,
type: 'ADJUST_INVENTORY',
payload: {
productId,
quantityChange: 100,
reason: 'Initial stock',
},
};
// Step 2: Simulate processing the first command.
// In a real system, a consumer worker would do this.
// We do it explicitly to control the test flow.
await commandHandler.handle(command);
// Step 3: Wait for the projection to update the read model.
// This is where our polling helper is essential.
await waitForCondition(
() => readModel.has(productId) && readModel.get(productId)?.quantity === 100,
{ timeout: 5000, interval: 100 }
);
const stateAfterFirstCommand = readModel.get(productId);
expect(stateAfterFirstCommand).toBeDefined();
expect(stateAfterFirstCommand?.quantity).toBe(100);
expect(await idempotencyStore.hasBeenProcessed(command.commandId)).toBe(true);
// Step 4: Process the EXACT SAME command again.
// This simulates a redelivery from Pulsar due to a temporary network failure
// or consumer crash after processing but before acknowledging.
logger.info('--- Sending duplicate command ---');
await commandHandler.handle(command);
// Step 5: Assert that the state HAS NOT changed.
// We can't use polling here because we expect no change.
// A short, fixed delay is acceptable here ONLY to allow any potential incorrect
// processing to occur. It's a pragmatic compromise.
await new Promise(resolve => setTimeout(resolve, 500));
const stateAfterDuplicateCommand = readModel.get(productId);
expect(stateAfterDuplicateCommand?.quantity).toBe(100);
// Final verification of the read model state.
const finalQuantity = readModel.get(productId)?.quantity;
expect(finalQuantity).toBe(100);
logger.info(`Final quantity for ${productId} is ${finalQuantity}, confirming idempotency.`);
});
});
This test structure finally gives us what we need: a reliable, deterministic way to verify that our system behaves correctly under message duplication. It directly targets the idempotency logic in the command handler and confirms that the downstream projection and read model are not corrupted by duplicate commands. The pitfall of the initial flaky tests was that they conflated testing application logic with testing distributed system behavior. By isolating the system under test with testcontainers
and using polling helpers, we can build a robust harness that gives us real confidence in our implementation.
The limitations of this approach must be acknowledged. We are directly invoking the commandHandler.handle
method to simulate a consumer. A more advanced setup might involve creating a test-specific consumer that we can control. Furthermore, this harness only tests for duplicate delivery. It does not address out-of-order message processing, a significantly more complex problem that often requires mechanisms like sequence IDs in messages or version numbers on aggregates to solve. Future iterations of this test harness could involve building a “chaos proxy” for the Pulsar client that can be instructed to intentionally delay or reorder messages, allowing us to test these more challenging scenarios.