Constructing a Real-Time Security Event Correlation Pipeline with a Data Lake, TimescaleDB, and Meilisearch


Our Security Operations Center (SOC) was drowning in data. The primary culprit was the firehose of logs from our Web Application Firewall (WAF), generating terabytes of semi-structured JSON events monthly. Our incumbent ELK-based logging solution buckled under the query load. Simple time-windowed queries to trace an attack vector could take minutes, rendering real-time threat detection impossible. The core problem was a fundamental architectural mismatch: we were using a general-purpose text search engine for what was predominantly a time-series analysis and incident-centric search problem. A complete rethink was necessary, not just an upgrade.

The initial concept was to dismantle the monolithic logging stack and build a purpose-built, multi-tiered data pipeline. The design philosophy was simple: use the right tool for each specific job to optimize for performance, cost, and query patterns.

graph TD
    subgraph Edge
        WAF[WAF Appliances]
    end

    subgraph Ingestion & Processing [Go Microservice]
        WAF -- JSON Logs --> NATS[NATS Message Queue]
        NATS -- Stream --> Processor
        Processor -- Raw Log --> DataLake[(MinIO Data Lake)]
        Processor -- Structured Event --> TimescaleDB[(TimescaleDB)]
    end

    subgraph Analysis & Search
        TimescaleDB -- Time-series Queries --> CorrelationEngine[Correlation Engine]
        CorrelationEngine -- High-Severity Incidents --> Meilisearch[(Meilisearch)]
        SOCAnalyst[SOC Analyst] -- Fast, Fuzzy Search --> Meilisearch
        SOCAnalyst -- Deep Dive Analytics --> TimescaleDB
        DataScientist[Data Scientist] -- Ad-hoc Bulk Analysis --> DataLake
    end

    style DataLake fill:#d4e8d4,stroke:#333,stroke-width:2px
    style TimescaleDB fill:#cde4f9,stroke:#333,stroke-width:2px
    style Meilisearch fill:#f9e3cd,stroke:#333,stroke-width:2px

This architecture splits the data lifecycle into three distinct tiers:

  1. Cold Storage (Data Lake): Raw, unaltered WAF logs are immediately archived to an S3-compatible object store (MinIO). This satisfies long-term compliance requirements at a minimal cost and provides a source of truth for offline batch processing or reprocessing.
  2. Hot Storage (Time-Series DB): Parsed, structured, and enriched log data is streamed into TimescaleDB. This is the engine for high-performance time-series analytics, enabling rapid aggregation and filtering over specific time windows. This is where we answer questions like “How many SQL injection attempts originated from ASN 12345 in the last 15 minutes?”.
  3. Incident Index (Search Engine): A dedicated correlation engine continuously queries TimescaleDB for predefined attack patterns. When a pattern is confirmed (e.g., a distributed brute-force attempt), a concise “incident” document is created and indexed in Meilisearch. This provides the SOC team with a sub-second, typo-tolerant search experience focused only on actionable intelligence, not noisy raw logs.

The decision to move away from a single-stack solution was pragmatic. Elasticsearch, while powerful, became prohibitively expensive and operationally complex at our scale for time-series data. TimescaleDB’s PostgreSQL foundation and superior time-series compression and querying capabilities made it a clear winner for the ‘hot’ tier. Meilisearch was chosen for its simplicity and unparalleled search-as-you-type speed, which is exactly what an analyst needs when under pressure during an incident investigation.

Foundation: Infrastructure and Data Models

Before writing a single line of Go, we defined the infrastructure and data schemas. This is a critical step often overlooked; a solid data model prevents significant refactoring later. We manage the entire stack with Docker Compose for local development.

docker-compose.yml:

version: '3.8'

services:
  nats:
    image: nats:2.9
    ports:
      - "4222:4222"
      - "8222:8222" # Monitoring
    command: "-js" # Enable JetStream

  minio:
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    ports:
      - "9000:9000" # API
      - "9001:9001" # Console
    volumes:
      - minio_data:/data
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
      MINIO_DEFAULT_BUCKETS: waf-logs-raw
    command: server /data --console-address ":9001"

  timescaledb:
    image: timescale/timescaledb:latest-pg14
    ports:
      - "5432:5432"
    volumes:
      - timescaledb_data:/var/lib/postgresql/data
      - ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
    environment:
      POSTGRES_DB: security_events
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password

  meilisearch:
    image: getmeili/meilisearch:v1.3
    ports:
      - "7700:7700"
    volumes:
      - meilisearch_data:/meili_data
    environment:
      MEILI_MASTER_KEY: 'aMasterKey'
      MEILI_ENV: 'development'

volumes:
  minio_data:
  timescaledb_data:
  meilisearch_data:

The corresponding SQL initialization for TimescaleDB sets up the core hypertable. A pitfall here is choosing the right chunk_time_interval. Too small, and you get too many chunks; too large, and recent data management is inefficient. A 1-day interval is a sane default for this use case.

init-db.sql:

-- Create the main table for WAF events
CREATE TABLE waf_events (
    time TIMESTAMPTZ NOT NULL,
    client_ip INET NOT NULL,
    host TEXT NOT NULL,
    request_method TEXT NOT NULL,
    request_uri TEXT NOT NULL,
    status_code INT,
    user_agent TEXT,
    rule_id INT NOT NULL,
    rule_msg TEXT,
    rule_tags TEXT[]
);

-- Turn it into a TimescaleDB hypertable, partitioned by time
SELECT create_hypertable('waf_events', 'time', chunk_time_interval => INTERVAL '1 day');

-- Create indexes for common query patterns
CREATE INDEX ON waf_events (time DESC);
CREATE INDEX ON waf_events (client_ip, time DESC);
CREATE INDEX ON waf_events (rule_id, time DESC);

-- Example of a continuous aggregate for dashboarding
-- This pre-calculates aggregates, making dashboards extremely fast.
CREATE MATERIALIZED VIEW waf_events_daily_summary
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 day', time) AS bucket,
    rule_id,
    COUNT(*) as event_count,
    COUNT(DISTINCT client_ip) as distinct_ips
FROM waf_events
GROUP BY bucket, rule_id;

-- Policy to automatically refresh the continuous aggregate
SELECT add_continuous_aggregate_policy('waf_events_daily_summary',
    start_offset => INTERVAL '3 days',
    end_offset   => INTERVAL '1 day',
    schedule_interval => INTERVAL '1 hour');

The Pipeline Core: Go Processing Service

The heart of the system is a Go service responsible for ingestion, parsing, and dispatching. Go’s concurrency model and performance characteristics are ideal for this kind of I/O-bound, high-throughput task.

The service is structured around a central Processor struct that holds connections to all downstream dependencies. We use zerolog for structured, high-performance logging.

processor/main.go:

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
	"github.com/nats-io/nats.go"
	"github.com/rs/zerolog"
)

func main() {
	logger := zerolog.New(os.Stdout).With().Timestamp().Logger()

	// Configuration - In a real project, this would come from a config file or env vars
	// using a library like Viper.
	config := &Config{
		NATSAddress:    "nats://localhost:4222",
		NATSStream:     "WAF_EVENTS",
		NATSSubject:    "waf.logs.raw",
		MinIOEndpoint:  "localhost:9000",
		MinIOAccessKey: "minioadmin",
		MinIOSecretKey: "minioadmin",
		MinIOBucket:    "waf-logs-raw",
		DBConnectionString: "postgres://user:password@localhost:5432/security_events",
	}

	// Graceful shutdown context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Setup connections
	nc, err := nats.Connect(config.NATSAddress)
	if err != nil {
		logger.Fatal().Err(err).Msg("Failed to connect to NATS")
	}
	defer nc.Close()

	js, err := nc.JetStream()
	if err != nil {
		logger.Fatal().Err(err).Msg("Failed to get JetStream context")
	}

	dbpool, err := pgxpool.New(ctx, config.DBConnectionString)
	if err != nil {
		logger.Fatal().Err(err).Msg("Failed to connect to TimescaleDB")
	}
	defer dbpool.Close()

	minioClient, err := minio.New(config.MinIOEndpoint, &minio.Options{
		Creds:  credentials.NewStaticV4(config.MinIOAccessKey, config.MinIOSecretKey, ""),
		Secure: false,
	})
	if err != nil {
		logger.Fatal().Err(err).Msg("Failed to connect to MinIO")
	}

	processor, err := NewEventProcessor(js, dbpool, minioClient, &logger, config)
	if err != nil {
		logger.Fatal().Err(err).Msg("Failed to create event processor")
	}

	// Start processing in the background
	go processor.Run(ctx)

	// Wait for termination signal
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	logger.Info().Msg("Shutting down processor...")
	cancel() // Signal goroutines to stop
	// A real implementation might use a WaitGroup to ensure all goroutines finish.
	time.Sleep(2 * time.Second) // Give time for graceful shutdown
	logger.Info().Msg("Processor shut down gracefully.")
}

// Config struct holds all necessary configuration parameters.
type Config struct {
	NATSAddress       string
	NATSStream        string
	NATSSubject       string
	MinIOEndpoint     string
	MinIOAccessKey    string
	MinIOSecretKey    string
	MinIOBucket       string
	DBConnectionString string
}

The actual processing logic resides within the EventProcessor. It subscribes to a durable NATS JetStream consumer. This is crucial: if the processor crashes, NATS retains the messages, and the processor can resume where it left off upon restart, ensuring no data loss.

The handleMessage function is the hot path. It parses the JSON, uploads the raw message to MinIO, and adds the structured version to a batch for insertion into TimescaleDB. Batching is paramount for database performance. Writing one row at a time would destroy TimescaleDB’s ingestion throughput. The pgx driver provides an excellent pgx.Batch interface for this.

processor/processor.go:

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net"
	"strings"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/minio/minio-go/v7"
	"github.com/nats-io/nats.go"
	"github.com/rs/zerolog"
)

const (
	batchSize    = 500
	batchTimeout = 2 * time.Second
)

// WAFEvent represents the structured data parsed from a raw log.
type WAFEvent struct {
	Time          time.Time `json:"timestamp"`
	ClientIP      string    `json:"client_ip"`
	Host          string    `json:"host"`
	RequestMethod string    `json:"request_method"`
	RequestURI    string    `json:"request_uri"`
	StatusCode    int       `json:"status_code"`
	UserAgent     string    `json:"user_agent"`
	RuleID        int       `json:"rule_id"`
	RuleMsg       string    `json:"rule_msg"`
	RuleTags      []string  `json:"rule_tags"`
}

type EventProcessor struct {
	js          nats.JetStreamContext
	dbpool      *pgxpool.Pool
	minioClient *minio.Client
	logger      *zerolog.Logger
	config      *Config
	
	batch      []WAFEvent
	batchMutex sync.Mutex
	ticker     *time.Ticker
}

func NewEventProcessor(js nats.JetStreamContext, dbpool *pgxpool.Pool, minioClient *minio.Client, logger *zerolog.Logger, config *Config) (*EventProcessor, error) {
	// Ensure NATS stream exists
	_, err := js.StreamInfo(config.NATSStream)
	if err != nil {
		logger.Info().Str("stream", config.NATSStream).Msg("Stream not found, creating it")
		_, err = js.AddStream(&nats.StreamConfig{
			Name:     config.NATSStream,
			Subjects: []string{fmt.Sprintf("%s.>", config.NATSSubject)},
		})
		if err != nil {
			return nil, err
		}
	}
	
	return &EventProcessor{
		js:          js,
		dbpool:      dbpool,
		minioClient: minioClient,
		logger:      logger,
		config:      config,
		batch:       make([]WAFEvent, 0, batchSize),
	}, nil
}


func (p *EventProcessor) Run(ctx context.Context) {
	p.ticker = time.NewTicker(batchTimeout)
	defer p.ticker.Stop()

	// A common pitfall is not creating a durable consumer.
	// A durable consumer ensures that if the service restarts, it picks up
	// where it left off.
	sub, err := p.js.PullSubscribe(p.config.NATSSubject, "PROCESSOR_DURABLE")
	if err != nil {
		p.logger.Error().Err(err).Msg("Failed to create pull subscription")
		return
	}
	
	p.logger.Info().Msg("Starting event processor...")

	for {
		select {
		case <-ctx.Done():
			p.logger.Info().Msg("Context cancelled, flushing final batch")
			p.flushBatch(context.Background()) // Use a new context for final flush
			return
		case <-p.ticker.C:
			p.flushBatch(ctx)
		default:
			msgs, err := sub.Fetch(batchSize, nats.MaxWait(time.Second))
			if err != nil && err != nats.ErrTimeout {
				p.logger.Error().Err(err).Msg("Error fetching messages from NATS")
				time.Sleep(1 * time.Second) // Avoid tight loop on persistent error
				continue
			}

			for _, msg := range msgs {
				p.handleMessage(ctx, msg)
				msg.Ack() // Acknowledge message after handling
			}
		}
	}
}

func (p *EventProcessor) handleMessage(ctx context.Context, msg *nats.Msg) {
	var event WAFEvent
	if err := json.Unmarshal(msg.Data, &event); err != nil {
		p.logger.Error().Err(err).Str("raw_message", string(msg.Data)).Msg("Failed to unmarshal WAF log")
		// In production, we might send this to a dead-letter queue.
		return
	}

	// Basic validation
	if net.ParseIP(event.ClientIP) == nil {
		p.logger.Warn().Str("ip", event.ClientIP).Msg("Invalid client IP address in event")
		return
	}
	
	// Fork goroutine to upload to MinIO to not block the main processing loop.
	// A WaitGroup could be used for more robust shutdown.
	go p.uploadToDataLake(ctx, msg.Data)

	p.batchMutex.Lock()
	p.batch = append(p.batch, event)
	batchFull := len(p.batch) >= batchSize
	p.batchMutex.Unlock()

	if batchFull {
		p.flushBatch(ctx)
	}
}

func (p *EventProcessor) uploadToDataLake(ctx context.Context, data []byte) {
	// Object name: /YYYY/MM/DD/UUID.json
	now := time.Now().UTC()
	objectName := fmt.Sprintf("%d/%02d/%02d/%s.json", now.Year(), now.Month(), now.Day(), uuid.New().String())

	_, err := p.minioClient.PutObject(ctx, p.config.MinIOBucket, objectName, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
		ContentType: "application/json",
	})

	if err != nil {
		p.logger.Error().Err(err).Str("object_name", objectName).Msg("Failed to upload raw log to MinIO")
	}
}

func (p *EventProcessor) flushBatch(ctx context.Context) {
	p.batchMutex.Lock()
	if len(p.batch) == 0 {
		p.batchMutex.Unlock()
		return
	}

	// Create a copy of the batch slice to be processed and reset the original.
	// This allows new events to be added to the batch while the current one is being flushed.
	batchToFlush := make([]WAFEvent, len(p.batch))
	copy(batchToFlush, p.batch)
	p.batch = p.batch[:0]
	p.batchMutex.Unlock()

	batch := &pgx.Batch{}
	for _, event := range batchToFlush {
		batch.Queue(`INSERT INTO waf_events (time, client_ip, host, request_method, request_uri, status_code, user_agent, rule_id, rule_msg, rule_tags)
			VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
			event.Time, event.ClientIP, event.Host, event.RequestMethod, event.RequestURI, event.StatusCode, event.UserAgent, event.RuleID, event.RuleMsg, event.RuleTags)
	}
	
	br := p.dbpool.SendBatch(ctx, batch)
	// A crucial step for error handling in batches is to close the result reader.
	// Forgetting this can lead to connection leaks.
	defer br.Close() 

	// Check for errors in the batch execution
	// We only check for the first error, but one could iterate through all results.
	_, err := br.Exec()
	if err != nil {
		// This is a critical failure. In a production system, we'd need a robust retry
		// mechanism or a way to shunt these failed batches to a recovery area.
		p.logger.Error().Err(err).Int("batch_size", len(batchToFlush)).Msg("Failed to execute batch insert to TimescaleDB")
	} else {
		p.logger.Info().Int("batch_size", len(batchToFlush)).Msg("Successfully flushed batch to TimescaleDB")
	}
}

Correlation and Incident Indexing

With data flowing into TimescaleDB, the next piece is the correlation engine. For this implementation, it’s a simple, periodic background task within the same Go service, but in a larger system, it would be a separate microservice.

This engine runs queries that define security incidents. A simple example is detecting a brute-force login attempt: many requests from a single IP to a login endpoint that are blocked by the WAF in a short time frame.

correlator/correlator.go:

package main // Fictional package for demonstration

import (
    "context"
	"time"
	
	"github.com/meilisearch/meilisearch-go"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/rs/zerolog"
)

// Incident represents a correlated security event, destined for Meilisearch.
type Incident struct {
	ID          string    `json:"id"`
	Type        string    `json:"type"`
	ClientIP    string    `json:"client_ip"`
	Targets     []string  `json:"targets"`
	StartTime   time.Time `json:"start_time"`
	EndTime     time.Time `json:"end_time"`
	EventCount  int       `json:"event_count"`
	RuleIDs     []int     `json:"rule_ids"`
	Description string    `json:"description"`
}


type Correlator struct {
	dbpool *pgxpool.Pool
	meili  *meilisearch.Client
	logger *zerolog.Logger
}

func NewCorrelator(dbpool *pgxpool.Pool, meili *meilisearch.Client, logger *zerolog.Logger) *Correlator {
	// In a real application, you would configure Meilisearch indexes here.
	// For example, setting searchable and filterable attributes.
	// meili.Index("incidents").UpdateFilterableAttributes(&[]string{"type", "client_ip", "start_time"})
	// meili.Index("incidents").UpdateSearchableAttributes(&[]string{"description", "client_ip", "targets"})
	return &Correlator{dbpool, meili, logger}
}

func (c *Correlator) Run(ctx context.Context) {
	ticker := time.NewTicker(1 * time.Minute) // Run checks every minute
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			c.logger.Info().Msg("Correlator shutting down.")
			return
		case <-ticker.C:
			c.logger.Info().Msg("Running correlation checks...")
			c.checkForBruteForce(ctx)
			// Add other checks here, e.g., checkForPortScanning(ctx)
		}
	}
}

func (c *Correlator) checkForBruteForce(ctx context.Context) {
	// This SQL query is the core of the detection logic.
	// It uses TimescaleDB's time_bucket to group events and identifies IPs
	// with more than 20 blocked login attempts in a 5-minute window.
	// A real-world query would be more sophisticated.
	const bruteForceQuery = `
		SELECT 
			client_ip, 
			COUNT(*) AS event_count,
			MIN(time) AS start_time,
			MAX(time) AS end_time,
			array_agg(DISTINCT host) as targets,
			array_agg(DISTINCT rule_id) as rule_ids
		FROM waf_events
		WHERE time > NOW() - INTERVAL '10 minutes'
		AND rule_msg ILIKE '%SQL Injection Attempt%'
		GROUP BY client_ip, time_bucket('5 minutes', time)
		HAVING COUNT(*) >= 10;
	`
	rows, err := c.dbpool.Query(ctx, bruteForceQuery)
	if err != nil {
		c.logger.Error().Err(err).Msg("Brute-force detection query failed")
		return
	}
	defer rows.Close()

	var incidents []meilisearch.Document

	for rows.Next() {
		var (
			clientIP    string
			eventCount  int
			startTime   time.Time
			endTime     time.Time
			targets     []string
			ruleIDs     []int
		)
		if err := rows.Scan(&clientIP, &eventCount, &startTime, &endTime, &targets, &ruleIDs); err != nil {
			c.logger.Error().Err(err).Msg("Failed to scan brute-force result row")
			continue
		}
		
		incident := Incident{
			ID:          fmt.Sprintf("bf-%s-%d", clientIP, startTime.Unix()),
			Type:        "BruteForceSQLi",
			ClientIP:    clientIP,
			Targets:     targets,
			StartTime:   startTime,
			EndTime:     endTime,
			EventCount:  eventCount,
			RuleIDs:     ruleIDs,
			Description: fmt.Sprintf("Potential brute-force SQLi from %s targeting %v", clientIP, targets),
		}
		incidents = append(incidents, incident)
	}
	
	if len(incidents) > 0 {
		c.logger.Info().Int("count", len(incidents)).Msg("Detected new brute-force incidents")
		// Indexing in Meilisearch. The first argument is the primary key field.
		taskInfo, err := c.meili.Index("incidents").AddDocuments(incidents, "id")
		if err != nil {
			c.logger.Error().Err(err).Msg("Failed to index incidents in Meilisearch")
		} else {
			c.logger.Info().Int64("task_id", taskInfo.TaskUID).Msg("Meilisearch indexing task created")
		}
	}
}

Once an incident is indexed, an analyst can perform a search like blocked attempts from 192.168 and get an immediate, relevant result from Meilisearch, linking them directly to the high-level incident without having to sift through millions of raw logs.

Limitations and Future Trajectory

This architecture, while vastly superior to our previous monolithic system, is not without its own set of challenges and limitations. The correlation logic is currently hard-coded in Go. A more mature implementation would require a dedicated rules engine, perhaps driven by a DSL like Sigma or configured via a UI, allowing security analysts to define detection patterns without code changes.

The current Go service is a single point of failure. While NATS JetStream provides data durability, the processing itself would halt. The natural evolution is to containerize this application and deploy it on Kubernetes, allowing for multiple replicas and providing high availability. This, however, introduces complexity around consumer group management to ensure messages are processed exactly once.

Furthermore, the data lake is still just a collection of raw JSON files. For efficient ad-hoc analytics using tools like Presto or Spark SQL, we would need to impose a proper table format like Apache Iceberg or Delta Lake on top of the raw data. This would provide schema enforcement, transactional capabilities, and dramatic performance improvements for large-scale analytical queries. The current alerting mechanism is non-existent; the logical next step is to use Meilisearch webhooks or a polling mechanism to trigger notifications to platforms like Slack or PagerDuty whenever a new high-severity incident is indexed.


  TOC