Our role-based access control (RBAC) system, built on a fairly standard IAM model, failed silently. The breach wasn’t a brute-force attack or a vulnerability exploit; it was a slow, methodical data exfiltration by a compromised insider account. Every individual action the account took was technically permissible under its assigned role. The view_customer_record
action was called, the download_report
endpoint was hit—all within policy. The failure was that our logging and alerting, based on the ELK stack, was configured to flag discrete, high-severity events: permission_denied
, failed_login_spike
, delete_critical_resource
. It had no concept of behavioral patterns. The attacker’s pattern—accessing thousands of unrelated records sequentially from a new geographical location outside of normal business hours—was deeply anomalous, but no single log entry triggered an alarm. This incident exposed a fundamental flaw in our threat detection model: we were checking for broken rules, not broken trust.
The initial concept was to move beyond static rules and into behavioral analysis. The problem is that “normal behavior” is a fuzzy, high-dimensional concept. A support engineer’s normal is vastly different from a sales director’s. We needed a system that could learn a baseline for each user role, or even each user, and flag significant deviations in real-time. The initial thought was to build complex correlation rules in Elasticsearch. This approach is brittle. A rule like ALERT IF user_A accesses > 100 records in 1 minute
creates false positives for legitimate bulk operations and misses a slow attack that accesses 99 records per minute. The real challenge is not about thresholds but about the semantic nature of the access pattern.
This led us to vector embeddings. If we could represent a user’s activity over a given time window as a numerical vector, we could use vector similarity search to find outliers. A user’s activity vector that is distant from all previously observed “normal” vectors for that user or role is, by definition, an anomaly. This is where the architecture started to take shape:
- NestJS Application: Continues to be the gatekeeper, enforcing IAM policies. It must be enhanced to produce extremely detailed, structured audit logs for every authenticated request.
- ELK Stack: The existing workhorse for log aggregation. It will ingest the audit trail, providing storage, searchability for incident response, and a visualization layer with Kibana.
- Pinecone: The core of the new detection engine. As a managed vector database, it abstracts away the complexity of building and scaling a high-performance approximate nearest neighbor (ANN) search index. In a real-world project, building this yourself with something like FAISS is a significant operational burden. We need to focus on the detection logic, not the infrastructure.
- Anomaly Detection Service (NestJS): A new, separate service that processes the log stream, generates behavior vectors, and orchestrates the interaction with Pinecone and the final alerting.
The following is a build log of this system, detailing the implementation from the audit log generation in the primary application to the final anomaly detection logic.
Phase 1: Rich Audit Logging at the Source
The entire system is useless without high-quality data. A common mistake is to log simple strings like User X accessed resource Y
. This is insufficient. We need a rich, structured context. We implemented this in our primary NestJS application using a global interceptor.
src/auditing/audit.interceptor.ts
:
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
Inject,
} from '@nestjs/common';
import { WINSTON_MODULE_PROVIDER } from 'nest-winston';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
import { Logger } from 'winston';
import { Request } from 'express';
export interface AuditLog {
timestamp: string;
requestId: string;
userId: string;
userRoles: string[];
ipAddress: string;
httpMethod: string;
path: string;
controller: string;
handler: string;
statusCode: number;
durationMs: number;
requestBody: any;
resourceId?: string; // e.g., customer ID, document ID from params
}
@Injectable()
export class AuditInterceptor implements NestInterceptor {
constructor(
@Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger,
) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const startTime = Date.now();
const ctx = context.switchToHttp();
const request = ctx.getRequest<Request>();
const response = ctx.getResponse();
return next.handle().pipe(
tap(() => {
const durationMs = Date.now() - startTime;
const { method, originalUrl, body, ip, user, params } = request;
const { statusCode } = response;
// A critical assumption: authentication middleware has already run
// and attached a `user` object to the request.
if (!user || !(user as any).id) {
// Do not log unauthenticated or malformed requests in this audit trail
return;
}
const currentUser = user as { id: string; roles: string[] };
const log: AuditLog = {
timestamp: new Date().toISOString(),
requestId: (request as any).id, // Assuming a request-id middleware is used
userId: currentUser.id,
userRoles: currentUser.roles,
ipAddress: ip,
httpMethod: method,
path: originalUrl,
controller: context.getClass().name,
handler: context.getHandler().name,
statusCode: statusCode,
durationMs,
requestBody: this.sanitizeRequestBody(body),
resourceId: params.id || null, // Example for extracting a resource ID
};
// Log as a structured JSON object
this.logger.info('IAM_AUDIT', { audit: log });
}),
);
}
private sanitizeRequestBody(body: any): any {
if (!body) {
return {};
}
// In a production system, you MUST strip sensitive fields like passwords, tokens, PII.
// This is a simplified example.
const sanitized = { ...body };
if (sanitized.password) sanitized.password = '[REDACTED]';
if (sanitized.creditCard) sanitized.creditCard = '[REDACTED]';
return sanitized;
}
}
This interceptor is registered globally in app.module.ts
. We use nest-winston
for logging, configured to output JSON. This ensures every log line sent to our aggregator is machine-readable.
src/app.module.ts
(excerpt):
import { Module } from '@nestjs/common';
import { APP_INTERCEPTOR } from '@nestjs/core';
import { WinstonModule } from 'nest-winston';
import * as winston from 'winston';
import { AuditInterceptor } from './auditing/audit.interceptor';
// ... other imports
@Module({
imports: [
WinstonModule.forRoot({
level: 'info',
format: winston.format.json(),
transports: [
// In production, this would be a transport that forwards logs,
// e.g., to Filebeat, Fluentd, or directly to Logstash.
// For development, we log to console.
new winston.transports.Console(),
],
}),
// ... other modules
],
providers: [
{
provide: APP_INTERCEPTOR,
useClass: AuditInterceptor,
},
// ... other providers
],
})
export class AppModule {}
The key takeaway here is discipline. Every authenticated endpoint now automatically generates a detailed audit record. The IAM_AUDIT
message differentiates these from regular application logs.
Phase 2: The ELK Ingestion and Processing Pipeline
With structured logs being generated, the next step is to get them into Elasticsearch. We use a standard Filebeat -> Logstash -> Elasticsearch pipeline. The crucial part is the Logstash configuration. It needs to parse the JSON, perform any necessary enrichments, and route the data to the correct Elasticsearch index.
logstash/pipeline/iam-audit.conf
:
input {
# In production, this would be a beats input listening for Filebeat
# For simplicity in this example, we use a TCP input.
# The NestJS logger would be configured to send logs here.
tcp {
port => 5044
codec => json_lines
}
}
filter {
# We only care about our specific audit logs. Drop everything else.
if [message] != "IAM_AUDIT" {
drop {}
}
# The actual log data is nested under the 'audit' key by our Winston logger.
# We promote it to the top level.
if [audit] {
json {
source => "audit"
target => "parsed_json"
}
# Merge the parsed JSON fields into the root of the event.
mutate {
merge => { "." => "parsed_json" }
remove_field => ["message", "audit", "parsed_json", "host", "port", "@version", "tags"]
}
}
# Data type conversions are critical for Elasticsearch.
mutate {
convert => {
"statusCode" => "integer"
"durationMs" => "float"
}
}
# GeoIP enrichment based on the source IP address. This is a powerful
# feature for detecting geographically anomalous access.
geoip {
source => "ipAddress"
}
# A pitfall here is timezone handling. Ensure all systems are on UTC.
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
}
output {
# Send to our dedicated Elasticsearch index for IAM audits.
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "iam_audit-%{+YYYY.MM.dd}"
user => "elastic"
password => "changeme"
}
# Additionally, we forward the processed log to Kafka for our real-time
# detection service to consume. This decouples ingestion from detection.
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "iam_audit_stream"
codec => json
}
}
This configuration does several important things: it filters for only the logs we care about, unnests the JSON payload, corrects data types, enriches the data with GeoIP information, and crucially, sends the data to two destinations: Elasticsearch for long-term storage and analysis, and Kafka for real-time processing. Decoupling with Kafka is a production-grade pattern that prevents the detection service from impacting the logging pipeline.
Phase 3: The Anomaly Detection Service and Vector Generation
This is the heart of the new system. We built it as a separate NestJS application to isolate its concerns. It has three main responsibilities: consume from Kafka, manage user behavior sessions, and generate vector embeddings.
The architecture is visualized below:
graph TD subgraph "Primary Application (NestJS)" A[API Request] --> B{IAM Middleware}; B --> C[Controller]; C --> D[Audit Interceptor]; D --> E[Winston Logger]; end subgraph "Logging & Streaming Pipeline" E --> F(Logstash); F --> G(Elasticsearch); F --> H(Kafka Topic: iam_audit_stream); end subgraph "Anomaly Detection Service (NestJS)" H --> I[Kafka Consumer]; I --> J{Session Manager}; J --> K[Embedding Generator]; K --> L[Pinecone Client]; L -- Query/Upsert --> M[(Pinecone Vector DB)]; L -- Anomaly Detected --> N[Alerting Webhook]; end subgraph "Observability & IR" G --> O{Kibana}; N --> P(Alert Manager); end O -- Visualize/Investigate --> G;
Session Management
We can’t generate a vector for every single API call. We need to analyze behavior over a window of time. The SessionManager
service collects logs for each user and, upon a trigger (e.g., 5 minutes of activity or 15 minutes of inactivity), flushes the session for embedding. We use an in-memory cache (like Redis in a real multi-instance setup) for this.
src/detection/session.manager.ts
:
import { Injectable, Logger } from '@nestjs/common';
import { AuditLog } from './dto/audit-log.dto'; // A DTO matching our log structure
import { Subject } from 'rxjs';
interface UserSession {
userId: string;
logs: AuditLog[];
lastActivity: number;
}
// A more robust implementation would use Redis for state management
// to handle service restarts and scaling across multiple instances.
const SESSIONS: Map<string, UserSession> = new Map();
const SESSION_FLUSH_INTERVAL_MS = 300000; // 5 minutes
const SESSION_INACTIVITY_TIMEOUT_MS = 600000; // 10 minutes
@Injectable()
export class SessionManager {
private readonly logger = new Logger(SessionManager.name);
public readonly sessionReady$ = new Subject<UserSession>();
constructor() {
setInterval(() => this.flushInactiveSessions(), SESSION_FLUSH_INTERVAL_MS);
}
public processLog(log: AuditLog): void {
let session = SESSIONS.get(log.userId);
if (!session) {
session = { userId: log.userId, logs: [], lastActivity: Date.now() };
SESSIONS.set(log.userId, session);
}
session.logs.push(log);
session.lastActivity = Date.now();
}
private flushInactiveSessions(): void {
const now = Date.now();
for (const [userId, session] of SESSIONS.entries()) {
if (now - session.lastActivity > SESSION_INACTIVITY_TIMEOUT_MS) {
this.logger.log(`Flushing inactive session for user ${userId}`);
this.emitAndClearSession(userId, session);
}
}
}
private emitAndClearSession(userId: string, session: UserSession): void {
if (session.logs.length > 0) {
this.sessionReady$.next({ ...session });
}
SESSIONS.delete(userId);
}
}
Vector Embedding Generation
This is the most critical and domain-specific part. The quality of the vector determines the quality of the detection. A poor vector will treat a sales director downloading 100 reports and an engineer accessing 100 microservices as the same behavior. We need to capture the semantics.
Our vector has a fixed dimensionality. We chose 128 dimensions, which is a common starting point.
src/detection/embedding.generator.ts
:
import { Injectable } from '@nestjs/common';
import { UserSession } from './session.manager';
import * as crypto from 'crypto';
// The dimension of our vector space.
export const VECTOR_DIMENSION = 128;
// A simple dictionary to map HTTP methods to indices.
const HTTP_METHOD_MAP = { 'GET': 0, 'POST': 1, 'PUT': 2, 'DELETE': 3, 'PATCH': 4 };
const NUM_HTTP_METHODS = Object.keys(HTTP_METHOD_MAP).length;
@Injectable()
export class EmbeddingGenerator {
public generateVector(session: UserSession): number[] {
const vector = new Array(VECTOR_DIMENSION).fill(0.0);
if (session.logs.length === 0) {
return vector;
}
// Feature 1: HTTP Method Distribution (Dimensions 0-4)
const methodCounts = new Array(NUM_HTTP_METHODS).fill(0);
session.logs.forEach(log => {
const index = HTTP_METHOD_MAP[log.httpMethod.toUpperCase()];
if (index !== undefined) {
methodCounts[index]++;
}
});
methodCounts.forEach((count, i) => {
vector[i] = count / session.logs.length;
});
// Feature 2: Hour of Day Distribution (Dimensions 5-28)
const hourCounts = new Array(24).fill(0);
session.logs.forEach(log => {
const hour = new Date(log.timestamp).getUTCHours();
hourCounts[hour]++;
});
hourCounts.forEach((count, i) => {
vector[5 + i] = count / session.logs.length;
});
// Feature 3: Endpoint Variety (Dimensions 29-92)
// We use a hashing trick to project a high-dimensional space (all possible endpoints)
// into a smaller, fixed-size feature space. This is a common technique
// to avoid needing a massive vocabulary of all known endpoints.
const uniqueEndpoints = new Set<string>(session.logs.map(l => `${l.httpMethod}:${l.path.split('?')[0]}`));
uniqueEndpoints.forEach(endpoint => {
const hash = crypto.createHash('md5').update(endpoint).digest('hex');
const index = parseInt(hash.substring(0, 4), 16) % 64; // Project into 64 bins
vector[29 + index] += 1.0;
});
// Normalize the endpoint hash vector
const endpointMag = Math.sqrt(vector.slice(29, 93).reduce((sum, val) => sum + val * val, 0));
if (endpointMag > 0) {
for (let i = 29; i < 93; i++) {
vector[i] /= endpointMag;
}
}
// Feature 4: Session Duration & Activity (Dimensions 93-94)
const timestamps = session.logs.map(l => new Date(l.timestamp).getTime());
const sessionDurationSeconds = (Math.max(...timestamps) - Math.min(...timestamps)) / 1000;
vector[93] = Math.log1p(sessionDurationSeconds); // Log transform to handle skew
vector[94] = session.logs.length / (sessionDurationSeconds + 1); // Actions per second
// Feature 5: Geo-location consistency (Dimensions 95-96)
const uniqueIPs = new Set(session.logs.map(l => l.ipAddress));
const uniqueCountries = new Set(session.logs.map(l => l.geoip?.country_code2).filter(Boolean));
vector[95] = uniqueIPs.size;
vector[96] = uniqueCountries.size;
// Remaining dimensions are reserved for future features.
// Final normalization of the entire vector
const magnitude = Math.sqrt(vector.reduce((sum, val) => sum + val * val, 0));
return magnitude > 0 ? vector.map(v => v / magnitude) : vector;
}
}
A key decision here was using a hashing trick for endpoint variety. A real-world system has thousands of endpoints. One-hot encoding them is not feasible. Hashing provides a good-enough approximation of which types of endpoints are being hit without an explosion in dimensionality. The vector is also normalized at the end, which is crucial for cosine distance calculations in Pinecone.
Phase 4: Integrating Pinecone for Detection
With a way to generate vectors, we can now use Pinecone. The logic is simple: for a given user’s behavior vector, we query Pinecone to find the most similar vectors from their past behavior. If the distance is large, it’s an anomaly. We also need a mechanism to continuously update the baseline of normal behavior.
src/detection/detection.service.ts
:
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { PineconeClient } from '@pinecone-database/pinecone';
import { ConfigService } from '@nestjs/config';
import { SessionManager } from './session.manager';
import { EmbeddingGenerator, VECTOR_DIMENSION } from './embedding.generator';
const ANOMALY_THRESHOLD = 0.6; // Cosine similarity distance threshold. A critical tunable parameter.
const MIN_SAMPLES_FOR_DETECTION = 10; // Don't start detecting until we have a baseline.
const PINECODE_NAMESPACE_PREFIX = 'iam-behavior-';
@Injectable()
export class DetectionService implements OnModuleInit {
private readonly logger = new Logger(DetectionService.name);
private pinecone: PineconeClient;
private index;
constructor(
private readonly configService: ConfigService,
private readonly sessionManager: SessionManager,
private readonly embeddingGenerator: EmbeddingGenerator,
) {}
async onModuleInit() {
this.pinecone = new PineconeClient();
await this.pinecone.init({
environment: this.configService.get('PINECONE_ENVIRONMENT'),
apiKey: this.configService.get('PINECONE_API_KEY'),
});
const indexName = this.configService.get('PINECONE_INDEX_NAME');
// In a real project, index creation should be handled via IaC, not application code.
// This is for demonstration.
const indexList = await this.pinecone.listIndexes();
if (!indexList.includes(indexName)) {
await this.pinecone.createIndex({
createRequest: {
name: indexName,
dimension: VECTOR_DIMENSION,
metric: 'cosine', // Cosine is good for normalized vectors
}
});
this.logger.warn(`Pinecone index '${indexName}' created.`);
}
this.index = this.pinecone.Index(indexName);
this.sessionManager.sessionReady$.subscribe(async (session) => {
await this.processSession(session);
});
}
private async processSession(session): Promise<void> {
const vector = this.embeddingGenerator.generateVector(session);
const userNamespace = `${PINECODE_NAMESPACE_PREFIX}${session.userId}`;
try {
const stats = await this.index.describeIndexStats({
filter: {}, // No filter needed for namespace stats
});
const vectorCount = stats.namespaces[userNamespace]?.vectorCount || 0;
if (vectorCount < MIN_SAMPLES_FOR_DETECTION) {
// We are in the "learning" phase for this user. Just store the vector.
await this.learnBehavior(userNamespace, vector);
return;
}
const queryResponse = await this.index.query({
queryRequest: {
namespace: userNamespace,
topK: 1, // We only need the closest match
vector: vector,
}
});
const closestMatch = queryResponse.matches[0];
// Pinecone returns similarity (1 is identical). We want distance.
const distance = 1 - closestMatch.score;
if (distance > ANOMALY_THRESHOLD) {
this.triggerAlert(session.userId, distance, session);
}
// Always update the user's behavior profile with the new vector.
await this.learnBehavior(userNamespace, vector);
} catch (error) {
this.logger.error(`Failed to process session for user ${session.userId}`, error.stack);
}
}
private async learnBehavior(namespace: string, vector: number[]): Promise<void> {
const vectorId = crypto.randomUUID();
await this.index.upsert({
upsertRequest: {
namespace,
vectors: [{ id: vectorId, values: vector }],
}
});
}
private triggerAlert(userId: string, score: number, session: any): void {
// In a production system, this would push to an alerting system like PagerDuty,
// create a ticket in JIRA, or send a message to a security Slack channel.
this.logger.warn(`ANOMALY DETECTED for user ${userId}!`);
this.logger.warn(` - Anomaly Score (distance): ${score.toFixed(4)}`);
this.logger.warn(` - Session Log Count: ${session.logs.length}`);
this.logger.warn(` - First Log Timestamp: ${session.logs[0].timestamp}`);
// The full session object provides context for the investigator.
}
}
A pragmatic choice was to use Pinecone namespaces to isolate each user’s vector data. This is a clean way to ensure we’re only comparing a user’s behavior against their own history. The MIN_SAMPLES_FOR_DETECTION
constant is crucial to prevent flagging a new user’s very first actions as anomalous. The system must learn what is normal before it can detect abnormal.
The final result is a pipeline that passively observes user activity, learns behavioral baselines on a per-user basis, and automatically flags statistically significant deviations. When an alert fires, an analyst doesn’t just get a single cryptic log entry. They get the full context of the user’s session from Elasticsearch (correlated by userId
and timestamp), and the anomaly score from our detection service, which quantifies how unusual the behavior was. This allows for a much faster and more accurate incident response.
The limitations of this approach, however, must be acknowledged. The quality of the embedding function is paramount; a poorly designed vector can lead to many false positives or negatives. Our current feature set is deterministic and doesn’t capture the sequential nature of API calls, which could be improved with sequence-aware models like transformers or LSTMs, though that adds significant complexity. Furthermore, the threshold for what constitutes an anomaly is currently a static, manually tuned value. A more advanced system would use statistical methods to derive dynamic thresholds. Finally, this system introduces new operational costs and complexities—managing a Kafka cluster, the ELK stack, and paying for a managed service like Pinecone requires budget and expertise. It is a trade-off between the cost of implementation and the cost of a sophisticated, undetected breach.