Our Security Operations Center (SOC) was drowning in data. The primary culprit was the firehose of logs from our Web Application Firewall (WAF), generating terabytes of semi-structured JSON events monthly. Our incumbent ELK-based logging solution buckled under the query load. Simple time-windowed queries to trace an attack vector could take minutes, rendering real-time threat detection impossible. The core problem was a fundamental architectural mismatch: we were using a general-purpose text search engine for what was predominantly a time-series analysis and incident-centric search problem. A complete rethink was necessary, not just an upgrade.
The initial concept was to dismantle the monolithic logging stack and build a purpose-built, multi-tiered data pipeline. The design philosophy was simple: use the right tool for each specific job to optimize for performance, cost, and query patterns.
graph TD subgraph Edge WAF[WAF Appliances] end subgraph Ingestion & Processing [Go Microservice] WAF -- JSON Logs --> NATS[NATS Message Queue] NATS -- Stream --> Processor Processor -- Raw Log --> DataLake[(MinIO Data Lake)] Processor -- Structured Event --> TimescaleDB[(TimescaleDB)] end subgraph Analysis & Search TimescaleDB -- Time-series Queries --> CorrelationEngine[Correlation Engine] CorrelationEngine -- High-Severity Incidents --> Meilisearch[(Meilisearch)] SOCAnalyst[SOC Analyst] -- Fast, Fuzzy Search --> Meilisearch SOCAnalyst -- Deep Dive Analytics --> TimescaleDB DataScientist[Data Scientist] -- Ad-hoc Bulk Analysis --> DataLake end style DataLake fill:#d4e8d4,stroke:#333,stroke-width:2px style TimescaleDB fill:#cde4f9,stroke:#333,stroke-width:2px style Meilisearch fill:#f9e3cd,stroke:#333,stroke-width:2px
This architecture splits the data lifecycle into three distinct tiers:
- Cold Storage (Data Lake): Raw, unaltered WAF logs are immediately archived to an S3-compatible object store (MinIO). This satisfies long-term compliance requirements at a minimal cost and provides a source of truth for offline batch processing or reprocessing.
- Hot Storage (Time-Series DB): Parsed, structured, and enriched log data is streamed into TimescaleDB. This is the engine for high-performance time-series analytics, enabling rapid aggregation and filtering over specific time windows. This is where we answer questions like “How many SQL injection attempts originated from ASN 12345 in the last 15 minutes?”.
- Incident Index (Search Engine): A dedicated correlation engine continuously queries TimescaleDB for predefined attack patterns. When a pattern is confirmed (e.g., a distributed brute-force attempt), a concise “incident” document is created and indexed in Meilisearch. This provides the SOC team with a sub-second, typo-tolerant search experience focused only on actionable intelligence, not noisy raw logs.
The decision to move away from a single-stack solution was pragmatic. Elasticsearch, while powerful, became prohibitively expensive and operationally complex at our scale for time-series data. TimescaleDB’s PostgreSQL foundation and superior time-series compression and querying capabilities made it a clear winner for the ‘hot’ tier. Meilisearch was chosen for its simplicity and unparalleled search-as-you-type speed, which is exactly what an analyst needs when under pressure during an incident investigation.
Foundation: Infrastructure and Data Models
Before writing a single line of Go, we defined the infrastructure and data schemas. This is a critical step often overlooked; a solid data model prevents significant refactoring later. We manage the entire stack with Docker Compose for local development.
docker-compose.yml
:
version: '3.8'
services:
nats:
image: nats:2.9
ports:
- "4222:4222"
- "8222:8222" # Monitoring
command: "-js" # Enable JetStream
minio:
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
ports:
- "9000:9000" # API
- "9001:9001" # Console
volumes:
- minio_data:/data
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
MINIO_DEFAULT_BUCKETS: waf-logs-raw
command: server /data --console-address ":9001"
timescaledb:
image: timescale/timescaledb:latest-pg14
ports:
- "5432:5432"
volumes:
- timescaledb_data:/var/lib/postgresql/data
- ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
environment:
POSTGRES_DB: security_events
POSTGRES_USER: user
POSTGRES_PASSWORD: password
meilisearch:
image: getmeili/meilisearch:v1.3
ports:
- "7700:7700"
volumes:
- meilisearch_data:/meili_data
environment:
MEILI_MASTER_KEY: 'aMasterKey'
MEILI_ENV: 'development'
volumes:
minio_data:
timescaledb_data:
meilisearch_data:
The corresponding SQL initialization for TimescaleDB sets up the core hypertable. A pitfall here is choosing the right chunk_time_interval
. Too small, and you get too many chunks; too large, and recent data management is inefficient. A 1-day interval is a sane default for this use case.
init-db.sql
:
-- Create the main table for WAF events
CREATE TABLE waf_events (
time TIMESTAMPTZ NOT NULL,
client_ip INET NOT NULL,
host TEXT NOT NULL,
request_method TEXT NOT NULL,
request_uri TEXT NOT NULL,
status_code INT,
user_agent TEXT,
rule_id INT NOT NULL,
rule_msg TEXT,
rule_tags TEXT[]
);
-- Turn it into a TimescaleDB hypertable, partitioned by time
SELECT create_hypertable('waf_events', 'time', chunk_time_interval => INTERVAL '1 day');
-- Create indexes for common query patterns
CREATE INDEX ON waf_events (time DESC);
CREATE INDEX ON waf_events (client_ip, time DESC);
CREATE INDEX ON waf_events (rule_id, time DESC);
-- Example of a continuous aggregate for dashboarding
-- This pre-calculates aggregates, making dashboards extremely fast.
CREATE MATERIALIZED VIEW waf_events_daily_summary
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 day', time) AS bucket,
rule_id,
COUNT(*) as event_count,
COUNT(DISTINCT client_ip) as distinct_ips
FROM waf_events
GROUP BY bucket, rule_id;
-- Policy to automatically refresh the continuous aggregate
SELECT add_continuous_aggregate_policy('waf_events_daily_summary',
start_offset => INTERVAL '3 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
The Pipeline Core: Go Processing Service
The heart of the system is a Go service responsible for ingestion, parsing, and dispatching. Go’s concurrency model and performance characteristics are ideal for this kind of I/O-bound, high-throughput task.
The service is structured around a central Processor
struct that holds connections to all downstream dependencies. We use zerolog
for structured, high-performance logging.
processor/main.go
:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
)
func main() {
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
// Configuration - In a real project, this would come from a config file or env vars
// using a library like Viper.
config := &Config{
NATSAddress: "nats://localhost:4222",
NATSStream: "WAF_EVENTS",
NATSSubject: "waf.logs.raw",
MinIOEndpoint: "localhost:9000",
MinIOAccessKey: "minioadmin",
MinIOSecretKey: "minioadmin",
MinIOBucket: "waf-logs-raw",
DBConnectionString: "postgres://user:password@localhost:5432/security_events",
}
// Graceful shutdown context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup connections
nc, err := nats.Connect(config.NATSAddress)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to connect to NATS")
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
logger.Fatal().Err(err).Msg("Failed to get JetStream context")
}
dbpool, err := pgxpool.New(ctx, config.DBConnectionString)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to connect to TimescaleDB")
}
defer dbpool.Close()
minioClient, err := minio.New(config.MinIOEndpoint, &minio.Options{
Creds: credentials.NewStaticV4(config.MinIOAccessKey, config.MinIOSecretKey, ""),
Secure: false,
})
if err != nil {
logger.Fatal().Err(err).Msg("Failed to connect to MinIO")
}
processor, err := NewEventProcessor(js, dbpool, minioClient, &logger, config)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to create event processor")
}
// Start processing in the background
go processor.Run(ctx)
// Wait for termination signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Info().Msg("Shutting down processor...")
cancel() // Signal goroutines to stop
// A real implementation might use a WaitGroup to ensure all goroutines finish.
time.Sleep(2 * time.Second) // Give time for graceful shutdown
logger.Info().Msg("Processor shut down gracefully.")
}
// Config struct holds all necessary configuration parameters.
type Config struct {
NATSAddress string
NATSStream string
NATSSubject string
MinIOEndpoint string
MinIOAccessKey string
MinIOSecretKey string
MinIOBucket string
DBConnectionString string
}
The actual processing logic resides within the EventProcessor
. It subscribes to a durable NATS JetStream consumer. This is crucial: if the processor crashes, NATS retains the messages, and the processor can resume where it left off upon restart, ensuring no data loss.
The handleMessage
function is the hot path. It parses the JSON, uploads the raw message to MinIO, and adds the structured version to a batch for insertion into TimescaleDB. Batching is paramount for database performance. Writing one row at a time would destroy TimescaleDB’s ingestion throughput. The pgx
driver provides an excellent pgx.Batch
interface for this.
processor/processor.go
:
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/minio/minio-go/v7"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
)
const (
batchSize = 500
batchTimeout = 2 * time.Second
)
// WAFEvent represents the structured data parsed from a raw log.
type WAFEvent struct {
Time time.Time `json:"timestamp"`
ClientIP string `json:"client_ip"`
Host string `json:"host"`
RequestMethod string `json:"request_method"`
RequestURI string `json:"request_uri"`
StatusCode int `json:"status_code"`
UserAgent string `json:"user_agent"`
RuleID int `json:"rule_id"`
RuleMsg string `json:"rule_msg"`
RuleTags []string `json:"rule_tags"`
}
type EventProcessor struct {
js nats.JetStreamContext
dbpool *pgxpool.Pool
minioClient *minio.Client
logger *zerolog.Logger
config *Config
batch []WAFEvent
batchMutex sync.Mutex
ticker *time.Ticker
}
func NewEventProcessor(js nats.JetStreamContext, dbpool *pgxpool.Pool, minioClient *minio.Client, logger *zerolog.Logger, config *Config) (*EventProcessor, error) {
// Ensure NATS stream exists
_, err := js.StreamInfo(config.NATSStream)
if err != nil {
logger.Info().Str("stream", config.NATSStream).Msg("Stream not found, creating it")
_, err = js.AddStream(&nats.StreamConfig{
Name: config.NATSStream,
Subjects: []string{fmt.Sprintf("%s.>", config.NATSSubject)},
})
if err != nil {
return nil, err
}
}
return &EventProcessor{
js: js,
dbpool: dbpool,
minioClient: minioClient,
logger: logger,
config: config,
batch: make([]WAFEvent, 0, batchSize),
}, nil
}
func (p *EventProcessor) Run(ctx context.Context) {
p.ticker = time.NewTicker(batchTimeout)
defer p.ticker.Stop()
// A common pitfall is not creating a durable consumer.
// A durable consumer ensures that if the service restarts, it picks up
// where it left off.
sub, err := p.js.PullSubscribe(p.config.NATSSubject, "PROCESSOR_DURABLE")
if err != nil {
p.logger.Error().Err(err).Msg("Failed to create pull subscription")
return
}
p.logger.Info().Msg("Starting event processor...")
for {
select {
case <-ctx.Done():
p.logger.Info().Msg("Context cancelled, flushing final batch")
p.flushBatch(context.Background()) // Use a new context for final flush
return
case <-p.ticker.C:
p.flushBatch(ctx)
default:
msgs, err := sub.Fetch(batchSize, nats.MaxWait(time.Second))
if err != nil && err != nats.ErrTimeout {
p.logger.Error().Err(err).Msg("Error fetching messages from NATS")
time.Sleep(1 * time.Second) // Avoid tight loop on persistent error
continue
}
for _, msg := range msgs {
p.handleMessage(ctx, msg)
msg.Ack() // Acknowledge message after handling
}
}
}
}
func (p *EventProcessor) handleMessage(ctx context.Context, msg *nats.Msg) {
var event WAFEvent
if err := json.Unmarshal(msg.Data, &event); err != nil {
p.logger.Error().Err(err).Str("raw_message", string(msg.Data)).Msg("Failed to unmarshal WAF log")
// In production, we might send this to a dead-letter queue.
return
}
// Basic validation
if net.ParseIP(event.ClientIP) == nil {
p.logger.Warn().Str("ip", event.ClientIP).Msg("Invalid client IP address in event")
return
}
// Fork goroutine to upload to MinIO to not block the main processing loop.
// A WaitGroup could be used for more robust shutdown.
go p.uploadToDataLake(ctx, msg.Data)
p.batchMutex.Lock()
p.batch = append(p.batch, event)
batchFull := len(p.batch) >= batchSize
p.batchMutex.Unlock()
if batchFull {
p.flushBatch(ctx)
}
}
func (p *EventProcessor) uploadToDataLake(ctx context.Context, data []byte) {
// Object name: /YYYY/MM/DD/UUID.json
now := time.Now().UTC()
objectName := fmt.Sprintf("%d/%02d/%02d/%s.json", now.Year(), now.Month(), now.Day(), uuid.New().String())
_, err := p.minioClient.PutObject(ctx, p.config.MinIOBucket, objectName, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
ContentType: "application/json",
})
if err != nil {
p.logger.Error().Err(err).Str("object_name", objectName).Msg("Failed to upload raw log to MinIO")
}
}
func (p *EventProcessor) flushBatch(ctx context.Context) {
p.batchMutex.Lock()
if len(p.batch) == 0 {
p.batchMutex.Unlock()
return
}
// Create a copy of the batch slice to be processed and reset the original.
// This allows new events to be added to the batch while the current one is being flushed.
batchToFlush := make([]WAFEvent, len(p.batch))
copy(batchToFlush, p.batch)
p.batch = p.batch[:0]
p.batchMutex.Unlock()
batch := &pgx.Batch{}
for _, event := range batchToFlush {
batch.Queue(`INSERT INTO waf_events (time, client_ip, host, request_method, request_uri, status_code, user_agent, rule_id, rule_msg, rule_tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
event.Time, event.ClientIP, event.Host, event.RequestMethod, event.RequestURI, event.StatusCode, event.UserAgent, event.RuleID, event.RuleMsg, event.RuleTags)
}
br := p.dbpool.SendBatch(ctx, batch)
// A crucial step for error handling in batches is to close the result reader.
// Forgetting this can lead to connection leaks.
defer br.Close()
// Check for errors in the batch execution
// We only check for the first error, but one could iterate through all results.
_, err := br.Exec()
if err != nil {
// This is a critical failure. In a production system, we'd need a robust retry
// mechanism or a way to shunt these failed batches to a recovery area.
p.logger.Error().Err(err).Int("batch_size", len(batchToFlush)).Msg("Failed to execute batch insert to TimescaleDB")
} else {
p.logger.Info().Int("batch_size", len(batchToFlush)).Msg("Successfully flushed batch to TimescaleDB")
}
}
Correlation and Incident Indexing
With data flowing into TimescaleDB, the next piece is the correlation engine. For this implementation, it’s a simple, periodic background task within the same Go service, but in a larger system, it would be a separate microservice.
This engine runs queries that define security incidents. A simple example is detecting a brute-force login attempt: many requests from a single IP to a login endpoint that are blocked by the WAF in a short time frame.
correlator/correlator.go
:
package main // Fictional package for demonstration
import (
"context"
"time"
"github.com/meilisearch/meilisearch-go"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
)
// Incident represents a correlated security event, destined for Meilisearch.
type Incident struct {
ID string `json:"id"`
Type string `json:"type"`
ClientIP string `json:"client_ip"`
Targets []string `json:"targets"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
EventCount int `json:"event_count"`
RuleIDs []int `json:"rule_ids"`
Description string `json:"description"`
}
type Correlator struct {
dbpool *pgxpool.Pool
meili *meilisearch.Client
logger *zerolog.Logger
}
func NewCorrelator(dbpool *pgxpool.Pool, meili *meilisearch.Client, logger *zerolog.Logger) *Correlator {
// In a real application, you would configure Meilisearch indexes here.
// For example, setting searchable and filterable attributes.
// meili.Index("incidents").UpdateFilterableAttributes(&[]string{"type", "client_ip", "start_time"})
// meili.Index("incidents").UpdateSearchableAttributes(&[]string{"description", "client_ip", "targets"})
return &Correlator{dbpool, meili, logger}
}
func (c *Correlator) Run(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute) // Run checks every minute
defer ticker.Stop()
for {
select {
case <-ctx.Done():
c.logger.Info().Msg("Correlator shutting down.")
return
case <-ticker.C:
c.logger.Info().Msg("Running correlation checks...")
c.checkForBruteForce(ctx)
// Add other checks here, e.g., checkForPortScanning(ctx)
}
}
}
func (c *Correlator) checkForBruteForce(ctx context.Context) {
// This SQL query is the core of the detection logic.
// It uses TimescaleDB's time_bucket to group events and identifies IPs
// with more than 20 blocked login attempts in a 5-minute window.
// A real-world query would be more sophisticated.
const bruteForceQuery = `
SELECT
client_ip,
COUNT(*) AS event_count,
MIN(time) AS start_time,
MAX(time) AS end_time,
array_agg(DISTINCT host) as targets,
array_agg(DISTINCT rule_id) as rule_ids
FROM waf_events
WHERE time > NOW() - INTERVAL '10 minutes'
AND rule_msg ILIKE '%SQL Injection Attempt%'
GROUP BY client_ip, time_bucket('5 minutes', time)
HAVING COUNT(*) >= 10;
`
rows, err := c.dbpool.Query(ctx, bruteForceQuery)
if err != nil {
c.logger.Error().Err(err).Msg("Brute-force detection query failed")
return
}
defer rows.Close()
var incidents []meilisearch.Document
for rows.Next() {
var (
clientIP string
eventCount int
startTime time.Time
endTime time.Time
targets []string
ruleIDs []int
)
if err := rows.Scan(&clientIP, &eventCount, &startTime, &endTime, &targets, &ruleIDs); err != nil {
c.logger.Error().Err(err).Msg("Failed to scan brute-force result row")
continue
}
incident := Incident{
ID: fmt.Sprintf("bf-%s-%d", clientIP, startTime.Unix()),
Type: "BruteForceSQLi",
ClientIP: clientIP,
Targets: targets,
StartTime: startTime,
EndTime: endTime,
EventCount: eventCount,
RuleIDs: ruleIDs,
Description: fmt.Sprintf("Potential brute-force SQLi from %s targeting %v", clientIP, targets),
}
incidents = append(incidents, incident)
}
if len(incidents) > 0 {
c.logger.Info().Int("count", len(incidents)).Msg("Detected new brute-force incidents")
// Indexing in Meilisearch. The first argument is the primary key field.
taskInfo, err := c.meili.Index("incidents").AddDocuments(incidents, "id")
if err != nil {
c.logger.Error().Err(err).Msg("Failed to index incidents in Meilisearch")
} else {
c.logger.Info().Int64("task_id", taskInfo.TaskUID).Msg("Meilisearch indexing task created")
}
}
}
Once an incident is indexed, an analyst can perform a search like blocked attempts from 192.168
and get an immediate, relevant result from Meilisearch, linking them directly to the high-level incident without having to sift through millions of raw logs.
Limitations and Future Trajectory
This architecture, while vastly superior to our previous monolithic system, is not without its own set of challenges and limitations. The correlation logic is currently hard-coded in Go. A more mature implementation would require a dedicated rules engine, perhaps driven by a DSL like Sigma or configured via a UI, allowing security analysts to define detection patterns without code changes.
The current Go service is a single point of failure. While NATS JetStream provides data durability, the processing itself would halt. The natural evolution is to containerize this application and deploy it on Kubernetes, allowing for multiple replicas and providing high availability. This, however, introduces complexity around consumer group management to ensure messages are processed exactly once.
Furthermore, the data lake is still just a collection of raw JSON files. For efficient ad-hoc analytics using tools like Presto or Spark SQL, we would need to impose a proper table format like Apache Iceberg or Delta Lake on top of the raw data. This would provide schema enforcement, transactional capabilities, and dramatic performance improvements for large-scale analytical queries. The current alerting mechanism is non-existent; the logical next step is to use Meilisearch webhooks or a polling mechanism to trigger notifications to platforms like Slack or PagerDuty whenever a new high-severity incident is indexed.