Our log aggregation platform was generating thousands of alerts, but the signal-to-noise ratio was abysmal. A flurry of “database connection refused” messages from different microservices would trigger a dozen distinct alerts, overwhelming the on-call engineer. These messages, while textually unique due to varying service names and timestamps, were semantically identical. Traditional text matching and regex-based alerting rules were failing us. The core technical pain point was an inability to group alerts based on semantic meaning in real-time.
The initial concept was to shift from lexical analysis to vector-based similarity search. Each incoming log or event would be passed through a sentence-embedding model, converting it into a high-dimensional vector. “Normal” operational states would form dense clusters in this vector space. A true anomaly would be a vector far from any known cluster. More importantly, a wave of semantically similar issues would produce a new, tight cluster of vectors, which we could identify and consolidate into a single, high-confidence meta-alert. This required a pipeline capable of real-time embedding, ultra-fast vector search, and a mechanism to push these findings to an operator dashboard with sub-second latency.
This led to a critical evaluation of our technology stack. For the vector database, we benchmarked a self-hosted FAISS index against Milvus. While FAISS offered raw performance, Milvus provided a more complete ecosystem with client SDKs, a standalone server architecture, and configurable indexing strategies that could be modified without code changes. In a real-world project, this operational flexibility is invaluable. For our streaming ingestion and query pattern, Milvus’s support for partitioning and hybrid search (filtering on metadata before vector search) was the deciding factor.
For the real-time push to the frontend, the debate was between WebSockets and Server-Sent Events (SSE). WebSockets offer bidirectional communication, which was overkill for our use case—we only needed to push data from the server to the client. SSE, operating over standard HTTP, is fundamentally simpler. It provides automatic reconnection handling out of the box, which reduces the amount of boilerplate and error-handling logic on the client side. A common mistake is to reach for the most powerful tool (WebSockets) when a simpler one (SSE) perfectly fits the requirements and reduces long-term maintenance overhead.
The frontend itself would be our standard React stack, built with Babel and Webpack, using Ant Design for the UI components. The final piece was monitoring. We could have tried to force Grafana to visualize this data, but it’s ill-suited for displaying unstructured, event-driven anomaly clusters. The pragmatic decision was to build a purpose-built dashboard with Ant Design for visualizing the semantic alert data, and relegate Grafana to its primary strength: monitoring the health and performance of our new vector-pipeline itself.
The Backend Pipeline: Ingestion, Vectorization, and Search
The core of the system is a Node.js service built with Express. It has three primary responsibilities: ingest raw events, convert them to vectors, query Milvus to find similar existing events, and broadcast new anomalies.
First, the server setup and Milvus connection management. A production-grade service cannot afford to re-establish connections on every request. Connection logic must be robust, centralized, and handle retries.
services/milvusClient.js
import { MilvusClient } from "@zilliz/milvus2-sdk-node";
import pino from "pino";
const logger = pino({ level: 'info' });
// Configuration should be externalized in a real-world project (e.g., env variables)
const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS || "localhost:19530";
const COLLECTION_NAME = "system_events";
const VECTOR_DIMENSION = 384; // Based on the 'all-MiniLM-L6-v2' model
class MilvusService {
constructor() {
this.client = new MilvusClient(MILVUS_ADDRESS);
this.ready = false;
this.initialize();
}
async initialize() {
try {
await this.ensureCollection();
this.ready = true;
logger.info("Milvus service initialized successfully.");
} catch (error) {
logger.error({ err: error }, "Failed to initialize Milvus service. Retrying in 10s.");
setTimeout(() => this.initialize(), 10000);
}
}
async ensureCollection() {
const collections = await this.client.showCollections();
const collectionExists = collections.data.some(c => c.name === COLLECTION_NAME);
if (!collectionExists) {
logger.warn(`Collection '${COLLECTION_NAME}' not found. Creating...`);
const schema = {
collection_name: COLLECTION_NAME,
fields: [
{ name: "id", data_type: 1, is_primary_key: true, autoID: true }, // DataType.Int64
{ name: "timestamp", data_type: 1 }, // DataType.Int64
{ name: "raw_event", data_type: 21, max_length: 65535 }, // DataType.VarChar
{ name: "event_vector", data_type: 101, dim: VECTOR_DIMENSION }, // DataType.FloatVector
],
};
await this.client.createCollection(schema);
// In a production system, index selection is critical for performance.
// HNSW is often superior for high-recall, low-latency search. IVF_FLAT is a balance.
await this.client.createIndex({
collection_name: COLLECTION_NAME,
field_name: "event_vector",
index_type: "IVF_FLAT",
metric_type: "L2", // Euclidean distance
params: { nlist: 1024 },
});
logger.info("Created collection and index in Milvus.");
}
// Load collection into memory for searching
await this.client.loadCollectionSync({ collection_name: COLLECTION_NAME });
}
async insertEvent(eventData) {
if (!this.ready) throw new Error("Milvus service not ready.");
const response = await this.client.insert({
collection_name: COLLECTION_NAME,
fields_data: [eventData],
});
// A common pitfall is not checking the response status.
if (response.status.error_code !== 'Success') {
throw new Error(`Milvus insertion failed: ${response.status.reason}`);
}
return response;
}
async searchSimilarVectors(vector, topK = 5) {
if (!this.ready) throw new Error("Milvus service not ready.");
const searchParams = {
anns_field: "event_vector",
topk: topK,
metric_type: "L2",
params: { nprobe: 16 }, // nprobe must be tuned based on nlist and performance/accuracy trade-offs
};
const response = await this.client.search({
collection_name: COLLECTION_NAME,
vectors: [vector],
search_params: searchParams,
output_fields: ["timestamp", "raw_event"],
});
return response.results;
}
}
export const milvusService = new MilvusService();
The ingestion endpoint uses a sentence-transformer model to create embeddings. For this example, we use the @xenova/transformers
library which can run directly in Node.js.
services/embeddingService.js
import { pipeline } from '@xenova/transformers';
import pino from 'pino';
const logger = pino({ level: 'info' });
class EmbeddingService {
constructor() {
this.pipe = null;
this.model = 'Xenova/all-MiniLM-L6-v2';
this.initialize();
}
async initialize() {
try {
// This will download the model on first run. In production, this should be
// part of the container build process to avoid runtime downloads.
logger.info(`Loading sentence-transformer model: ${this.model}`);
this.pipe = await pipeline('feature-extraction', this.model);
logger.info("Embedding model loaded successfully.");
} catch (error) {
logger.error({ err: error }, "Failed to load embedding model.");
// This is a fatal error for the service.
process.exit(1);
}
}
async createEmbedding(text) {
if (!this.pipe) {
throw new Error("Embedding service not initialized.");
}
const result = await this.pipe(text, { pooling: 'mean', normalize: true });
return Array.from(result.data);
}
}
export const embeddingService = new EmbeddingService();
The core anomaly detection logic resides in the main application file. It orchestrates embedding, searching, and deciding if an event is a new anomaly.
index.js (snippet)
// ... imports for express, cors, etc.
import { embeddingService } from './services/embeddingService.js';
import { milvusService } from './services/milvusClient.js';
import { sseService } from './services/sseService.js';
import { metrics } from './monitoring/metrics.js';
import pino from 'pino';
const app = express();
app.use(cors());
app.use(express.json());
const logger = pino({ level: 'info' });
const ANOMALY_DISTANCE_THRESHOLD = 0.7; // This threshold is empirical and needs tuning.
app.post('/ingest', async (req, res) => {
const { event_message } = req.body;
if (!event_message || typeof event_message !== 'string') {
return res.status(400).json({ error: 'Invalid event message' });
}
const start = process.hrtime();
try {
const vector = await embeddingService.createEmbedding(event_message);
const searchResults = await milvusService.searchSimilarVectors(vector, 1);
const [nearestNeighbor] = searchResults;
let isAnomaly = true;
let clusterId = null;
if (nearestNeighbor && nearestNeighbor.score < ANOMALY_DISTANCE_THRESHOLD) {
// Event is similar to an existing one, not a new anomaly type.
isAnomaly = false;
}
// Always insert the event for future comparisons.
await milvusService.insertEvent({
timestamp: Date.now(),
raw_event: event_message,
event_vector: vector,
});
if (isAnomaly) {
logger.info({ event: event_message, distance: nearestNeighbor?.score }, "New anomaly detected.");
metrics.anomaliesTotal.inc();
// Broadcast to all connected SSE clients.
sseService.broadcast({
type: 'new_anomaly',
payload: {
id: new Date().getTime(), // simple unique id
message: event_message,
distance: nearestNeighbor ? nearestNeighbor.score : Infinity,
timestamp: new Date().toISOString(),
}
});
}
const duration = process.hrtime(start);
const durationMs = duration[0] * 1000 + duration[1] / 1e6;
metrics.ingestionLatency.observe(durationMs / 1000); // observe in seconds
res.status(202).json({ status: 'accepted', is_anomaly: isAnomaly });
} catch (error) {
logger.error({ err: error }, "Error during event ingestion.");
res.status(500).json({ error: 'Internal server error' });
}
});
// ... more endpoints below
The Server-Sent Events Broadcasting Layer
The SSE implementation is deceptively simple but has critical details. The service must manage a list of active clients, write data in the correct format (data: JSON_STRING\n\n
), and gracefully handle client disconnections to prevent memory leaks.
services/sseService.js
import pino from 'pino';
import { metrics } from '../monitoring/metrics.js';
const logger = pino({ level: 'info' });
class SseService {
constructor() {
this.clients = new Set();
}
// This method is called by the Express route handler.
subscribe(req, res) {
// Essential SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
});
// Send a connection confirmation message
res.write('event: connection\ndata: {"status":"connected"}\n\n');
this.clients.add(res);
metrics.sseConnectionsActive.inc();
logger.info(`SSE client connected. Total clients: ${this.clients.size}`);
// The 'close' event is crucial for cleanup.
req.on('close', () => {
this.clients.delete(res);
metrics.sseConnectionsActive.dec();
logger.info(`SSE client disconnected. Total clients: ${this.clients.size}`);
});
}
broadcast(data) {
if (this.clients.size === 0) {
return; // No-op if no clients are connected.
}
const formattedMessage = `data: ${JSON.stringify(data)}\n\n`;
this.clients.forEach(client => {
// A try-catch block is a defensive measure in case a client connection
// is in a weird state before the 'close' event has fired.
try {
client.write(formattedMessage);
} catch (error) {
logger.warn({ err: error }, "Failed to write to a SSE client. It might have disconnected.");
this.clients.delete(client);
metrics.sseConnectionsActive.dec();
}
});
}
}
export const sseService = new SseService();
The corresponding Express route is straightforward:index.js (snippet)
// ... after /ingest endpoint
app.get('/events', (req, res) => {
sseService.subscribe(req, res);
});
The Frontend: A Real-Time Ant Design Dashboard
The frontend is a React application. The core logic involves using the native EventSource
API to connect to our SSE endpoint and updating the state as new events arrive. Ant Design’s Table
component is perfect for displaying a stream of structured data.
src/AnomalyDashboard.js
import React, { useState, useEffect, useCallback } from 'react';
import { Table, Tag, Typography, Layout, Badge, Statistic } from 'antd';
import 'antd/dist/reset.css';
const { Header, Content } = Layout;
const { Title } = Typography;
const columns = [
{
title: 'Timestamp',
dataIndex: 'timestamp',
key: 'timestamp',
render: (ts) => new Date(ts).toLocaleTimeString(),
width: 150,
},
{
title: 'Anomaly Event Message',
dataIndex: 'message',
key: 'message',
},
{
title: 'Novelty Score (Distance)',
dataIndex: 'distance',
key: 'distance',
render: (dist) => (
<Tag color={dist > 1.0 ? 'volcano' : 'orange'}>
{dist === Infinity ? 'NEW CLUSTER' : dist.toFixed(4)}
</Tag>
),
width: 200,
},
];
const AnomalyDashboard = () => {
const [anomalies, setAnomalies] = useState([]);
const [isConnected, setIsConnected] = useState(false);
const handleNewAnomaly = useCallback((event) => {
const data = JSON.parse(event.data);
if (data.type === 'new_anomaly') {
// Using a functional update is best practice to avoid stale state issues.
setAnomalies(prevAnomalies => [data.payload, ...prevAnomalies.slice(0, 99)]);
}
}, []);
useEffect(() => {
const eventSource = new EventSource('http://localhost:8080/events');
eventSource.onopen = () => {
console.log("SSE connection established.");
setIsConnected(true);
};
eventSource.addEventListener('connection', (e) => {
console.log('Received connection confirmation:', e.data);
});
eventSource.onmessage = handleNewAnomaly;
eventSource.onerror = (err) => {
console.error("EventSource failed:", err);
setIsConnected(false);
// EventSource will automatically try to reconnect.
};
// Cleanup function to close the connection when the component unmounts.
return () => {
eventSource.close();
console.log("SSE connection closed.");
};
}, [handleNewAnomaly]);
return (
<Layout style={{ minHeight: '100vh' }}>
<Header style={{ display: 'flex', alignItems: 'center', justifyContent: 'space-between' }}>
<Title level={3} style={{ color: 'white', margin: 0 }}>Real-Time Vector Anomaly Detector</Title>
<Statistic
title={<span style={{ color: 'rgba(255,255,255,0.7)' }}>Connection</span>}
valueRender={() => <Badge status={isConnected ? 'processing' : 'error'} text={isConnected ? 'Live' : 'Disconnected'} />}
/>
</Header>
<Content style={{ padding: '24px' }}>
<Table
columns={columns}
dataSource={anomalies}
rowKey="id"
pagination={false}
bordered
title={() => `Displaying last ${anomalies.length} anomalies`}
/>
</Content>
</Layout>
);
};
export default AnomalyDashboard;
This React code is transpiled using Babel as part of a standard Create React App or Vite build process.
Meta-Monitoring with Grafana
Our custom dashboard is for consuming the results of the pipeline. Grafana is for ensuring the pipeline itself is healthy. We exposed a /metrics
endpoint on the Node.js server using the prom-client
library.
monitoring/metrics.js
import { register, Counter, Gauge, Histogram } from 'prom-client';
export const metrics = {
anomaliesTotal: new Counter({
name: 'vector_pipeline_anomalies_total',
help: 'Total number of new anomalies detected',
}),
sseConnectionsActive: new Gauge({
name: 'vector_pipeline_sse_connections_active',
help: 'Number of active Server-Sent Events clients',
}),
ingestionLatency: new Histogram({
name: 'vector_pipeline_ingestion_latency_seconds',
help: 'Latency of the event ingestion and processing pipeline',
buckets: [0.05, 0.1, 0.25, 0.5, 1, 2.5, 5], // Buckets in seconds
}),
};
// Expose the metrics endpoint
export const metricsMiddleware = async (req, res) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
};
And in index.js
:
import { metricsMiddleware } from './monitoring/metrics.js';
app.get('/metrics', metricsMiddleware);
// Server startup logic
const PORT = process.env.PORT || 8080;
app.listen(PORT, () => {
logger.info(`Server running on port ${PORT}`);
});
With this endpoint scraped by Prometheus, we created a Grafana dashboard with key panels:
- Anomaly Rate: A graph using the PromQL query
rate(vector_pipeline_anomalies_total[5m])
. A sudden spike indicates a major incident; a flatline zero might mean our detector is broken. - Active SSE Clients: A stat panel with the query
vector_pipeline_sse_connections_active
. If this number drops to zero unexpectedly, our frontends are disconnected. - P95 Ingestion Latency: A graph of
histogram_quantile(0.95, sum(rate(vector_pipeline_ingestion_latency_seconds_bucket[5m])) by (le))
. This is our most important performance indicator. If this latency climbs, it could point to a bottleneck in the embedding model or, more likely, a performance degradation in Milvus.
graph TD A[Event Sources] -->|Log/Metric Stream| B(Ingestion Service - Node.js/Express); B -->|1. Text| C{Embedding Model}; C -->|2. Vector| B; B -->|3. Search/Insert| D[(Milvus)]; D -->|4. Results| B; B -->|5. Broadcast Anomaly| E(SSE Broadcaster); E -->|Real-time Push| F[React/Ant Design Frontend]; subgraph Meta-Monitoring B -->|Prometheus Metrics| G(Prometheus); G -->|SLI/SLO Data| H[Grafana Dashboard]; end
The final architecture provides a clear separation of concerns. Milvus handles the heavy lifting of vector search. The Node.js service acts as a lightweight orchestrator and real-time broadcaster. Ant Design enables rapid development of a specialized UI, and Grafana provides the essential safety net, ensuring the entire system remains observable and reliable.
The system is not without its limitations. The choice of a generic sentence-embedding model is a compromise; fine-tuning a BERT-like model on our domain-specific logs would dramatically improve the quality of vector representations and the accuracy of the similarity search. The anomaly detection threshold is currently a static, empirically derived constant. A future iteration should implement an adaptive threshold or a more advanced clustering algorithm like DBSCAN directly within the vector space to dynamically identify emerging event clusters. Finally, the backend’s in-memory state for SSE clients is a single point of failure and does not scale horizontally. A more robust implementation would use a Redis Pub/Sub model to decouple the broadcasters from the HTTP connection handlers, allowing for a truly scalable and resilient real-time push infrastructure.