The monolith’s biggest flaw wasn’t performance; it was inertia. Our initial text processing service was simple: it received user feedback, ran a sentiment analysis, and stored the result. But then came the request for PII (Personally Identifiable Information) redaction. Then language detection. Then entity extraction. Each new Natural Language Processing (NLP) feature required branching the core logic, introducing new dependencies, and scheduling a full-service redeployment. The testing matrix exploded, and regression bugs became a constant threat. The system was tightly coupled, and its development velocity ground to a halt.
Our primary goal was to decouple the NLP tasks from each other and from the core ingestion logic. A piece of text should trigger a cascade of independent operations, not flow through a rigid, monolithic function. This pointed directly to an Event-Driven Architecture (EDA). The concept was straightforward: an initial event, TEXT_SUBMITTED
, would be fired. A series of autonomous “plugins” would listen for this event or subsequent events, perform their specific NLP task, and emit new events with their results. This would allow us to add, remove, or update NLP capabilities by simply deploying a new plugin that subscribed to the event bus, without touching the existing system.
For implementation, we chose Node.js with TypeScript for its non-blocking I/O model, which is a natural fit for event-driven systems, and for the safety net of static typing, which is non-negotiable when dealing with complex event schemas. For testing, we needed something fast, modern, and with first-class support for TypeScript and mocking. Vitest was the obvious choice. Its speed, vi.spyOn
and vi.mock
APIs, were precisely what we needed to validate the isolated behavior of each plugin and the complex interactions within the event-driven flow.
The first step was to define the core abstractions: the Event Bus and the Plugin interface. To avoid premature dependency on a specific message broker like Kafka or RabbitMQ, we started with an in-memory implementation. This allowed us to focus on the architecture’s correctness while providing a clean interface for a future production-grade implementation.
// src/core/eventBus.ts
import { z, ZodSchema } from 'zod';
// Define a structured event type
export interface IEvent<T extends string, P> {
type: T;
payload: P;
metadata: {
eventId: string;
timestamp: Date;
traceId: string; // Crucial for observability
};
}
// A map to hold event types to their Zod schemas for validation
export type EventSchemaMap = {
[eventType: string]: ZodSchema;
};
export type EventHandler<T = any> = (event: T) => Promise<void>;
// The core interface for our event bus
export interface IEventBus {
publish<T extends string, P>(event: IEvent<T, P>): Promise<void>;
subscribe<T extends string, P>(
eventType: T,
handler: EventHandler<IEvent<T, P>>
): void;
}
// In-memory implementation for development and testing
export class InMemoryEventBus implements IEventBus {
private subscribers: Map<string, EventHandler[]> = new Map();
private schemas: EventSchemaMap;
constructor(schemas: EventSchemaMap) {
this.schemas = schemas;
console.log('[EventBus] Initialized with schema validation.');
}
public subscribe<T extends string, P>(
eventType: T,
handler: EventHandler<IEvent<T, P>>
): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(handler as EventHandler);
console.log(`[EventBus] New subscriber for event type: ${eventType}`);
}
public async publish<T extends string, P>(event: IEvent<T, P>): Promise<void> {
const schema = this.schemas[event.type];
if (!schema) {
console.error(`[EventBus] Validation failed: No schema found for event type "${event.type}".`);
// In a real system, this might go to a dead-letter queue.
return;
}
const validationResult = schema.safeParse(event.payload);
if (!validationResult.success) {
console.error(`[EventBus] Validation failed for event "${event.type}":`, validationResult.error.errors);
return;
}
const handlers = this.subscribers.get(event.type);
if (!handlers) {
console.log(`[EventBus] No subscribers for event type: ${event.type}`);
return;
}
console.log(`[EventBus] Publishing event "${event.type}" to ${handlers.length} subscriber(s). Trace ID: ${event.metadata.traceId}`);
// Execute all handlers concurrently
await Promise.all(handlers.map(handler => {
try {
return handler(event);
} catch (error) {
console.error(`[EventBus] Error in handler for event "${event.type}":`, error);
// Implement dead-letter or retry logic here
return Promise.resolve(); // Don't let one failed handler stop others
}
}));
}
}
The key decision here was integrating Zod for schema validation directly into the bus. In a distributed system, you cannot trust producers to send correctly formatted data. Enforcing a schema at the entry point of the bus prevents malformed data from poisoning downstream consumers. It acts as a strict contract between services.
Next, we defined the IPlugin
interface. Each plugin needs a name for identification and a register
method to connect to the event bus.
// src/core/plugin.ts
import { IEventBus } from './eventBus';
export interface IPlugin {
name: string;
register(eventBus: IEventBus): void;
}
With the core abstractions in place, we could build the first concrete plugins. Let’s start with a Language Detection plugin and a Sentiment Analysis plugin.
// src/schemas.ts
import { z } from 'zod';
import type { EventSchemaMap } from './core/eventBus';
// Define the payload schemas for our events
export const textSubmittedPayloadSchema = z.object({
text: z.string().min(1),
});
export const languageDetectedPayloadSchema = z.object({
text: z.string(),
languageCode: z.string().length(2), // e.g., 'en', 'es'
});
export const sentimentAnalyzedPayloadSchema = z.object({
text: z.string(),
languageCode: z.string().length(2),
sentiment: z.enum(['positive', 'negative', 'neutral']),
score: z.number(),
});
// A central map of event types to their schemas
export const appEventSchemas: EventSchemaMap = {
TEXT_SUBMITTED: textSubmittedPayloadSchema,
LANGUAGE_DETECTED: languageDetectedPayloadSchema,
SENTIMENT_ANALYZED: sentimentAnalyzedPayloadSchema,
};
// For convenience, let's create helper types
export type TextSubmittedPayload = z.infer<typeof textSubmittedPayloadSchema>;
export type LanguageDetectedPayload = z.infer<typeof languageDetectedPayloadSchema>;
export type SentimentAnalyzedPayload = z.infer<typeof sentimentAnalyzedPayloadSchema>;
Here’s the implementation for the two plugins. Note how they are completely independent of each other.
// src/plugins/languageDetector.ts
import { IEventBus, IEvent } from '../core/eventBus';
import { IPlugin } from '../core/plugin';
import { v4 as uuidv4 } from 'uuid';
import { TextSubmittedPayload, LanguageDetectedPayload, languageDetectedPayloadSchema } from '../schemas';
// A mock detection service
const detectLanguage = (text: string): string => {
if (text.toLowerCase().includes('hola')) return 'es';
return 'en';
};
export class LanguageDetectorPlugin implements IPlugin {
public name = 'LanguageDetectorPlugin';
register(eventBus: IEventBus): void {
eventBus.subscribe('TEXT_SUBMITTED', this.handleTextSubmitted.bind(this, eventBus));
}
private async handleTextSubmitted(
eventBus: IEventBus,
event: IEvent<'TEXT_SUBMITTED', TextSubmittedPayload>
): Promise<void> {
console.log(`[${this.name}] Received TEXT_SUBMITTED event. Trace ID: ${event.metadata.traceId}`);
const { text } = event.payload;
const languageCode = detectLanguage(text);
const newPayload: LanguageDetectedPayload = { text, languageCode };
const newEvent: IEvent<'LANGUAGE_DETECTED', LanguageDetectedPayload> = {
type: 'LANGUAGE_DETECTED',
payload: newPayload,
metadata: {
...event.metadata, // Propagate traceId
eventId: uuidv4(),
timestamp: new Date(),
}
};
await eventBus.publish(newEvent);
}
}
// src/plugins/sentimentAnalyzer.ts
import { IEventBus, IEvent } from '../core/eventBus';
import { IPlugin } from '../core/plugin';
import { v4 as uuidv4 } from 'uuid';
import { LanguageDetectedPayload, SentimentAnalyzedPayload } from '../schemas';
// A mock sentiment analysis service
const analyzeSentiment = (text: string): { sentiment: 'positive' | 'negative' | 'neutral'; score: number } => {
const lowerText = text.toLowerCase();
if (lowerText.includes('great') || lowerText.includes('excellent')) {
return { sentiment: 'positive', score: 0.9 };
}
if (lowerText.includes('bad') || lowerText.includes('terrible')) {
return { sentiment: 'negative', score: -0.8 };
}
return { sentiment: 'neutral', score: 0.1 };
};
export class SentimentAnalyzerPlugin implements IPlugin {
public name = 'SentimentAnalyzerPlugin';
register(eventBus: IEventBus): void {
eventBus.subscribe('LANGUAGE_DETECTED', this.handleLanguageDetected.bind(this, eventBus));
}
private async handleLanguageDetected(
eventBus: IEventBus,
event: IEvent<'LANGUAGE_DETECTED', LanguageDetectedPayload>
): Promise<void> {
const { text, languageCode } = event.payload;
// This plugin only handles English text. This is a business rule.
if (languageCode !== 'en') {
console.log(`[${this.name}] Skipping non-English text. Trace ID: ${event.metadata.traceId}`);
return;
}
console.log(`[${this.name}] Received LANGUAGE_DETECTED event for English text. Trace ID: ${event.metadata.traceId}`);
const { sentiment, score } = analyzeSentiment(text);
const newPayload: SentimentAnalyzedPayload = { text, languageCode, sentiment, score };
const newEvent: IEvent<'SENTIMENT_ANALYZED', SentimentAnalyzedPayload> = {
type: 'SENTIMENT_ANALYZED',
payload: newPayload,
metadata: {
...event.metadata,
eventId: uuidv4(),
timestamp: new Date(),
}
};
await eventBus.publish(newEvent);
}
}
The architecture becomes clear. The LanguageDetectorPlugin
consumes TEXT_SUBMITTED
events and produces LANGUAGE_DETECTED
events. The SentimentAnalyzerPlugin
consumes LANGUAGE_DETECTED
events and produces SENTIMENT_ANALYZED
events. This chain can be extended indefinitely.
The main application orchestrates the setup:
// src/main.ts
import { InMemoryEventBus, IEvent, IEventBus } from './core/eventBus';
import { IPlugin } from './core/plugin';
import { LanguageDetectorPlugin } from './plugins/languageDetector';
import { SentimentAnalyzerPlugin } from './plugins/sentimentAnalyzer';
import { appEventSchemas, TextSubmittedPayload } from './schemas';
import { v4 as uuidv4 } from 'uuid';
class NlpPipelineApp {
private eventBus: IEventBus;
private plugins: IPlugin[];
constructor() {
this.eventBus = new InMemoryEventBus(appEventSchemas);
this.plugins = [
new LanguageDetectorPlugin(),
new SentimentAnalyzerPlugin(),
// To add a new feature, e.g., an EntityRecognitionPlugin, just add it here.
];
this.registerPlugins();
}
private registerPlugins(): void {
this.plugins.forEach(plugin => {
plugin.register(this.eventBus);
console.log(`[App] Registered plugin: ${plugin.name}`);
});
}
public async processText(text: string): Promise<string> {
const traceId = uuidv4();
console.log(`\n--- Starting new processing flow with Trace ID: ${traceId} ---`);
const initialPayload: TextSubmittedPayload = { text };
const initialEvent: IEvent<'TEXT_SUBMITTED', TextSubmittedPayload> = {
type: 'TEXT_SUBMITTED',
payload: initialPayload,
metadata: {
eventId: uuidv4(),
timestamp: new Date(),
traceId,
},
};
await this.eventBus.publish(initialEvent);
return traceId;
}
}
// Example usage
async function run() {
const app = new NlpPipelineApp();
await app.processText("This is a great product, excellent service!");
await app.processText("This is a terrible experience.");
await app.processText("Hola, cómo estás?"); // This should be skipped by the sentiment analyzer
}
run();
This system is functional, but its real value is proven through testing. How do we ensure each part works correctly in isolation and that the whole system integrates properly? This is where Vitest became critical.
Our testing strategy had two layers:
- Unit Tests: Verify that each plugin, given a specific input event, attempts to publish the correct output event. We don’t care about the event bus itself here; we mock it.
- Integration Tests: Verify that when an initial event is published to a real
InMemoryEventBus
, the correct sequence of plugins is triggered, leading to the expected final event.
Here’s a unit test for the SentimentAnalyzerPlugin
using Vitest’s vi.spyOn
:
// src/plugins/sentimentAnalyzer.test.ts
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { InMemoryEventBus } from '../core/eventBus';
import { SentimentAnalyzerPlugin } from './sentimentAnalyzer';
import { appEventSchemas } from '../schemas';
import { v4 as uuidv4 } from 'uuid';
describe('SentimentAnalyzerPlugin Unit Test', () => {
let mockEventBus: InMemoryEventBus;
let sentimentPlugin: SentimentAnalyzerPlugin;
beforeEach(() => {
// We use a real event bus instance but will spy on its publish method
mockEventBus = new InMemoryEventBus(appEventSchemas);
sentimentPlugin = new SentimentAnalyzerPlugin();
sentimentPlugin.register(mockEventBus);
});
it('should process English text and publish a SENTIMENT_ANALYZED event', async () => {
const publishSpy = vi.spyOn(mockEventBus, 'publish');
const inputEvent = {
type: 'LANGUAGE_DETECTED' as const,
payload: { text: 'This is a great library', languageCode: 'en' },
metadata: { eventId: uuidv4(), timestamp: new Date(), traceId: 'trace-123' },
};
// Manually trigger the handler, simulating the event bus calling it
// @ts-ignore - Accessing private method for testing purposes
await sentimentPlugin.handleLanguageDetected(mockEventBus, inputEvent);
expect(publishSpy).toHaveBeenCalledOnce();
const publishedEvent = publishSpy.mock.calls[0][0];
expect(publishedEvent.type).toBe('SENTIMENT_ANALYZED');
expect(publishedEvent.payload.sentiment).toBe('positive');
expect(publishedEvent.payload.score).toBe(0.9);
expect(publishedEvent.payload.text).toBe('This is a great library');
expect(publishedEvent.metadata.traceId).toBe('trace-123'); // Ensure traceId is propagated
});
it('should ignore non-English text and not publish any event', async () => {
const publishSpy = vi.spyOn(mockEventBus, 'publish');
const inputEvent = {
type: 'LANGUAGE_DETECTED' as const,
payload: { text: 'Esto es una gran biblioteca', languageCode: 'es' },
metadata: { eventId: uuidv4(), timestamp: new Date(), traceId: 'trace-456' },
};
// @ts-ignore
await sentimentPlugin.handleLanguageDetected(mockEventBus, inputEvent);
expect(publishSpy).not.toHaveBeenCalled();
});
});
This test perfectly isolates the plugin’s logic. We control the input and spy on the output boundary (eventBus.publish
), asserting that the plugin behaves as expected under different conditions.
The integration test is more complex. It verifies the collaboration between plugins.
graph TD subgraph Test Flow A[Test starts] --> B{Publish TEXT_SUBMITTED}; B --> C[InMemoryEventBus]; C --> D[LanguageDetectorPlugin handles]; D --> E{Publish LANGUAGE_DETECTED}; E --> C; C --> F[SentimentAnalyzerPlugin handles]; F --> G{Publish SENTIMENT_ANALYZED}; G --> C; end subgraph Assertions H[Spy on EventBus.publish] --> I{Verify LANGUAGE_DETECTED was published}; H --> J{Verify SENTIMENT_ANALYZED was published}; end
Here’s the code for the integration test:
// src/pipeline.integration.test.ts
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { InMemoryEventBus, IEvent } from './core/eventBus';
import { LanguageDetectorPlugin } from './plugins/languageDetector';
import { SentimentAnalyzerPlugin } from './plugins/sentimentAnalyzer';
import { appEventSchemas } from './schemas';
import { v4 as uuidv4 } from 'uuid';
describe('NLP Pipeline Integration Test', () => {
let eventBus: InMemoryEventBus;
beforeEach(() => {
eventBus = new InMemoryEventBus(appEventSchemas);
// Register all plugins for the integration test
new LanguageDetectorPlugin().register(eventBus);
new SentimentAnalyzerPlugin().register(eventBus);
});
it('should process a text through the full pipeline', async () => {
const publishSpy = vi.spyOn(eventBus, 'publish');
const initialEvent: IEvent<'TEXT_SUBMITTED', { text: string }> = {
type: 'TEXT_SUBMITTED',
payload: { text: 'This product is truly excellent.' },
metadata: { eventId: uuidv4(), timestamp: new Date(), traceId: 'full-pipeline-trace' },
};
// This is the only external call. The rest is internal eventing.
await eventBus.publish(initialEvent);
// The spy is called for the initial event, plus the two subsequent events.
expect(publishSpy).toHaveBeenCalledTimes(1 + 2); // Initial + LanguageDetected + SentimentAnalyzed
// Check the event from LanguageDetectorPlugin
const languageEvent = publishSpy.mock.calls.find(call => call[0].type === 'LANGUAGE_DETECTED')?.[0];
expect(languageEvent).toBeDefined();
expect(languageEvent?.payload).toEqual({
text: 'This product is truly excellent.',
languageCode: 'en',
});
expect(languageEvent?.metadata.traceId).toBe('full-pipeline-trace');
// Check the final event from SentimentAnalyzerPlugin
const sentimentEvent = publishSpy.mock.calls.find(call => call[0].type === 'SENTIMENT_ANALYZED')?.[0];
expect(sentimentEvent).toBeDefined();
expect(sentimentEvent?.payload).toEqual({
text: 'This product is truly excellent.',
languageCode: 'en',
sentiment: 'positive',
score: 0.9,
});
expect(sentimentEvent?.metadata.traceId).toBe('full-pipeline-trace');
});
it('should stop the pipeline for non-English text after language detection', async () => {
const publishSpy = vi.spyOn(eventBus, 'publish');
const initialEvent: IEvent<'TEXT_SUBMITTED', { text: string }> = {
type: 'TEXT_SUBMITTED',
payload: { text: 'Qué buen producto.' },
metadata: { eventId: uuidv4(), timestamp: new Date(), traceId: 'spanish-trace' },
};
await eventBus.publish(initialEvent);
// Initial publish + LanguageDetector publish. SentimentAnalyzer should not publish.
expect(publishSpy).toHaveBeenCalledTimes(1 + 1);
const languageEvent = publishSpy.mock.calls.find(call => call[0].type === 'LANGUAGE_DETECTED')?.[0];
expect(languageEvent).toBeDefined();
expect(languageEvent?.payload.languageCode).toBe('es');
const sentimentEvent = publishSpy.mock.calls.find(call => call[0].type === 'SENTIMENT_ANALYZED')?.[0];
expect(sentimentEvent).toBeUndefined();
});
});
This test provides high confidence that the system works as a whole. We are not mocking plugin behavior; we are letting them run and communicate through a real (though in-memory) event bus and asserting the final state by observing the events that flow through it.
This architecture, validated by a robust testing strategy, solved our initial problem. Adding a new EntityRecognitionPlugin
is now a matter of creating a new class that listens for LANGUAGE_DETECTED
and adding new EntityRecognitionPlugin()
to the main application’s plugin array. No existing code needs to be changed.
However, this solution is not without its own set of challenges. The InMemoryEventBus
is a single point of failure and does not persist events, making it unsuitable for production. The immediate next step is creating a production-ready IEventBus
implementation using a system like Kafka or Redis Streams, which would also provide the durability and scalability needed. The current error handling is primitive; a production system would need sophisticated retry mechanisms, exponential backoff, and a robust dead-letter queue strategy for events that consistently fail processing. Finally, observability is key in a distributed system; while we’ve included a traceId
, a full implementation would require integrating with a distributed tracing standard like OpenTelemetry to propagate context and visualize the entire flow of an operation across different plugins.