The standard P99 latency metric for our primary trading execution service was stable, yet we were experiencing sporadic, unexplained timeouts reported by downstream systems. The Jaeger UI confirmed that while the 99th percentile was within SLO, the tail end of the distribution was volatile. Elasticsearch, our Jaeger backend, is excellent for keyword searches and visualizing individual traces, but it’s fundamentally ill-suited for the kind of numerical analysis we needed. Running complex aggregations to calculate statistical moments or generate histograms across millions of spans was either prohibitively slow or simply impossible. This gap in observability meant we were flying blind, unable to distinguish between a healthy, long-tailed distribution and a system developing a pathological performance profile.
Our initial concept was to build a parallel analysis pipeline. Instead of replacing our existing Jaeger setup, we would augment it. The idea was to siphon off the raw trace data stream, persist it into a database built for scalable transactional workloads and complex queries, and then run offline (or near-real-time) computational jobs against that data. This architecture decouples the high-throughput ingestion of traces from the resource-intensive analysis, ensuring that our deep dives into performance data wouldn’t risk destabilizing the primary observability platform.
The technology selection process was critical. For storage, we needed a horizontally scalable, resilient database that spoke SQL. While NoSQL options were considered, the ability to perform joins, window functions, and complex filtering was non-negotiable for the types of questions we wanted to ask. CockroachDB emerged as the top candidate. Its PostgreSQL wire compatibility meant we could leverage a mature ecosystem of client libraries, and its distributed, consensus-based architecture promised the resilience and scalability we required without the operational overhead of manually sharding a traditional PostgreSQL cluster.
For the ingestion component, the most direct path was to implement a custom collector that speaks the Jaeger gRPC protocol. This avoids introducing another message bus like Kafka into the critical path, reducing both latency and complexity. A small, stateless service written in Go seemed like the perfect fit for this task—high concurrency, native gRPC support, and excellent performance.
Finally, for the analysis itself, the problem was purely computational. We needed to pull batches of latency data and perform statistical calculations efficiently. Python, with its rich data science ecosystem, was the obvious choice. Specifically, NumPy provides the foundation for high-performance array computations, making it trivial to calculate percentiles, standard deviations, and histograms on large datasets far more efficiently than could be done in-database or with standard Python lists.
Database Schema Design in CockroachDB
Before writing any service code, the foundation must be solid. A poorly designed schema in a distributed database can lead to performance hotspots and scaling bottlenecks. Our primary goal was to optimize for fast, batched writes from the collector and efficient time-series reads for the analysis worker.
We settled on two main tables: spans
and span_tags
. A common pitfall is to store tags in a single JSONB column. While convenient, querying for spans based on specific tag values can be inefficient if not indexed properly. By normalizing tags into a separate table, we improve query flexibility at the cost of write amplification. In a real-world project, this trade-off is crucial. We decided that the query performance gain was worth the more complex write logic.
Here is the DDL for the tables in CockroachDB:
-- The main table to store span data.
-- We use a composite primary key with `start_time` first to enable
-- efficient time-series queries and locality-optimized partitioning in CockroachDB.
CREATE TABLE spans (
trace_id BYTES NOT NULL,
span_id BYTES NOT NULL,
parent_span_id BYTES,
operation_name STRING NOT NULL,
service_name STRING NOT NULL,
start_time TIMESTAMPTZ NOT NULL,
duration_micros INT NOT NULL,
flags INT,
-- Using INVERTED INDEX for searching key-value pairs in process tags efficiently.
process_tags JSONB,
INDEX (trace_id, span_id),
INDEX (service_name, operation_name, start_time DESC)
) PRIMARY KEY (start_time DESC, trace_id, span_id);
-- A separate table for span-level tags (logs and key-value tags).
-- This avoids a wide, sparse spans table and allows for more targeted indexing.
CREATE TABLE span_tags (
trace_id BYTES NOT NULL,
span_id BYTES NOT NULL,
start_time TIMESTAMPTZ NOT NULL, -- Included for co-partitioning with spans table
tag_key STRING NOT NULL,
tag_type STRING NOT NULL, -- e.g., 'string', 'int', 'bool', 'binary'
string_val STRING,
int_val INT,
bool_val BOOL,
double_val FLOAT,
binary_val BYTES,
INDEX (tag_key, string_val) WHERE string_val IS NOT NULL,
INDEX (tag_key, int_val) WHERE int_val IS NOT NULL
) PRIMARY KEY (start_time DESC, trace_id, span_id, tag_key);
-- Set table localities to optimize for time-series data.
-- This tells CockroachDB to keep data from the same time range together.
ALTER TABLE spans SPLIT AT VALUES (now() - '24h'::INTERVAL), (now() - '12h'::INTERVAL), (now());
ALTER TABLE span_tags SPLIT AT VALUES (now() - '24h'::INTERVAL), (now() - '12h'::INTERVAL), (now());
The choice of BYTES
for trace_id
and span_id
is intentional. Jaeger uses 128-bit and 64-bit identifiers, which are more compactly stored as raw bytes than as hexadecimal strings. The primary key (start_time DESC, trace_id, span_id)
is the most critical design decision here. By leading with start_time DESC
, we physically colocate recent data, which is exactly what our analysis worker will be querying. This is a classic time-series optimization pattern.
The High-Throughput Go Collector
The collector is a gRPC server that implements the jaeger.api_v2.CollectorService
. Its sole job is to receive batches of spans, transform them into our table schema, and write them to CockroachDB as efficiently as possible. A common mistake in such services is to insert spans one by one, which creates a massive amount of network round-trips and transaction overhead. The correct approach is to use a bulk-loading mechanism. CockroachDB, being compatible with PostgreSQL, supports the COPY FROM
protocol, which is the most performant way to ingest large amounts of data.
Here’s a significant portion of the collector’s implementation in Go.
main.go
:
package main
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"google.golang.org/grpc"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Configuration should be loaded from env vars or a config file in a real project.
dbConnStr := os.Getenv("DATABASE_URL")
if dbConnStr == "" {
dbConnStr = "postgresql://root@localhost:26257/jaeger_analysis?sslmode=disable"
}
listenAddr := ":14250"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Initialize the database repository, which handles all DB interactions.
repo, err := NewRepository(ctx, dbConnStr, logger)
if err != nil {
logger.Error("failed to initialize repository", "error", err)
os.Exit(1)
}
defer repo.Close()
// Create the gRPC service implementation.
collectorService := NewCollectorService(repo, logger)
// Set up and start the gRPC server.
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
logger.Error("failed to listen on address", "addr", listenAddr, "error", err)
os.Exit(1)
}
grpcServer := grpc.NewServer()
api_v2.RegisterCollectorServiceServer(grpcServer, collectorService)
logger.Info("starting gRPC server", "address", listenAddr)
go func() {
if err := grpcServer.Serve(lis); err != nil {
logger.Error("gRPC server failed", "error", err)
cancel()
}
}()
// Graceful shutdown handling.
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
select {
case <-stop:
logger.Info("shutting down server")
case <-ctx.Done():
logger.Error("server context canceled unexpectedly")
}
// Give the server a deadline to shutdown gracefully.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
grpcServer.GracefulStop()
logger.Info("server stopped gracefully")
}
repository.go
:
package main
import (
"context"
"encoding/json"
"log/slog"
"time"
"github.com/gogo/protobuf/proto"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
// Repository handles database operations.
type Repository struct {
pool *pgxpool.Pool
logger *slog.Logger
}
func NewRepository(ctx context.Context, connStr string, logger *slog.Logger) (*Repository, error) {
pool, err := pgxpool.New(ctx, connStr)
if err != nil {
return nil, fmt.Errorf("unable to create connection pool: %w", err)
}
if err := pool.Ping(ctx); err != nil {
return nil, fmt.Errorf("database ping failed: %w", err)
}
return &Repository{pool: pool, logger: logger}, nil
}
func (r *Repository) Close() {
r.pool.Close()
}
// WriteSpans performs a bulk insert of spans and their tags using the COPY protocol.
func (r *Repository) WriteSpans(ctx context.Context, spans []*model.Span) error {
// A real-world implementation would include retries with exponential backoff.
tx, err := r.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx) // Rollback is a no-op if tx is committed.
spanCopyCount, err := tx.CopyFrom(
ctx,
pgx.Identifier{"spans"},
[]string{"trace_id", "span_id", "parent_span_id", "operation_name", "service_name", "start_time", "duration_micros", "flags", "process_tags"},
&spanSource{spans: spans},
)
if err != nil {
return fmt.Errorf("failed to copy spans: %w", err)
}
r.logger.Debug("copied spans to db", "count", spanCopyCount)
tagCopyCount, err := tx.CopyFrom(
ctx,
pgx.Identifier{"span_tags"},
[]string{"trace_id", "span_id", "start_time", "tag_key", "tag_type", "string_val", "int_val", "bool_val", "double_val", "binary_val"},
&tagSource{spans: spans},
)
if err != nil {
return fmt.Errorf("failed to copy tags: %w", err)
}
r.logger.Debug("copied tags to db", "count", tagCopyCount)
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}
// spanSource is an adapter to satisfy the pgx.CopyFromSource interface for spans.
type spanSource struct {
spans []*model.Span
idx int
}
func (s *spanSource) Next() bool {
return s.idx < len(s.spans)
}
func (s *spanSource) Values() ([]interface{}, error) {
span := s.spans[s.idx]
s.idx++
var parentSpanID []byte
if span.ParentSpanID != 0 {
parentSpanID = span.ParentSpanID.Bytes()
}
processTagsJSON, err := json.Marshal(span.Process.Tags)
if err != nil {
// Log the error but don't fail the entire batch. Insert null instead.
// In production, this might go to a dead-letter queue.
processTagsJSON = []byte("null")
}
return []interface{}{
span.TraceID.Bytes(),
span.SpanID.Bytes(),
parentSpanID,
span.OperationName,
span.Process.ServiceName,
span.StartTime,
span.Duration.Microseconds(),
span.Flags,
processTagsJSON,
}, nil
}
func (s *spanSource) Err() error {
return nil
}
// tagSource is a more complex adapter for tags, iterating through each span's tags.
type tagSource struct {
spans []*model.Span
spanIdx int
tagIdx int
}
func (ts *tagSource) Next() bool {
// This logic ensures we iterate through every tag of every span.
// Omitted for brevity, but it would advance ts.spanIdx and ts.tagIdx correctly.
return ts.spanIdx < len(ts.spans)
}
func (ts *tagSource) Values() ([]interface{}, error) {
// This logic extracts the current tag's values.
// Omitted for brevity, would return a slice like:
// {span.TraceID.Bytes(), span.SpanID.Bytes(), span.StartTime, tag.Key, ...vals...}
return nil, nil // Placeholder
}
func (ts *tagSource) Err() error {
return nil
}
Note: The implementation of tagSource
is complex due to the nested iteration and has been stubbed for brevity. A full implementation would carefully manage indices to emit one row per tag.
collector.go
:
package main
import (
"context"
"log/slog"
"github.com/gogo/protobuf/proto"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
// CollectorService implements the Jaeger gRPC Collector API.
type CollectorService struct {
api_v2.UnimplementedCollectorServiceServer
repo *Repository
logger *slog.Logger
}
func NewCollectorService(repo *Repository, logger *slog.Logger) *CollectorService {
return &CollectorService{repo: repo, logger: logger}
}
// PostSpans is the main entry point for receiving traces from Jaeger agents/clients.
func (s *CollectorService) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
s.logger.Info("received span batch", "span_count", len(r.Batch.Spans))
// The protobuf model needs to be converted to the internal Jaeger model.
// This model is more convenient to work with.
spans := make([]*model.Span, 0, len(r.Batch.Spans))
for _, s := range r.Batch.Spans {
spans = append(spans, s)
}
// In a production system, this write should be asynchronous, using a channel
// and a pool of worker goroutines to decouple the gRPC handler from the
// database write latency. This prevents slow DB writes from blocking clients.
if err := s.repo.WriteSpans(ctx, spans); err != nil {
s.logger.Error("failed to write spans", "error", err)
// Return an error to the client so it can potentially retry.
return nil, fmt.Errorf("storage error: %w", err)
}
return &api_v2.PostSpansResponse{}, nil
}
This Go service is lean and focused. It demonstrates proper use of pgx
for high-performance writes, structured logging for observability, and graceful shutdown patterns. The key takeaway is the use of COPY FROM
, which is an order of magnitude faster than row-by-row INSERT
s for this kind of workload.
The NumPy-Powered Analysis Worker
With data flowing into CockroachDB, the next step is analysis. The Python worker’s job is to periodically query for recent trace data, load it into NumPy arrays, and perform statistical calculations that would be inefficient or impossible in SQL.
import os
import time
import logging
import psycopg2
import numpy as np
from psycopg2.extras import RealDictCursor
# --- Configuration ---
# In a real app, use a proper config library (e.g., Pydantic, Dynaconf)
DB_CONN_STR = os.getenv("DATABASE_URL", "postgresql://root@localhost:26257/jaeger_analysis?sslmode=disable")
ANALYSIS_INTERVAL_SECONDS = 60
ANALYSIS_WINDOW_MINUTES = 5
TARGET_SERVICE = "trading-engine"
TARGET_OPERATION = "execute_trade"
Z_SCORE_THRESHOLD = 3.0 # Threshold for considering a latency an outlier
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def connect_db():
"""Establishes a connection to the CockroachDB cluster."""
try:
conn = psycopg2.connect(DB_CONN_STR)
logging.info("Successfully connected to CockroachDB.")
return conn
except psycopg2.OperationalError as e:
logging.error(f"Could not connect to database: {e}")
return None
def fetch_latency_data(conn, service_name, operation_name, window_minutes):
"""Fetches span durations for a specific service and operation within a time window."""
query = """
SELECT
duration_micros
FROM
spans
WHERE
service_name = %(service_name)s
AND
operation_name = %(operation_name)s
AND
start_time >= now() - make_interval(mins => %(window_minutes)s)
"""
params = {
"service_name": service_name,
"operation_name": operation_name,
"window_minutes": window_minutes
}
with conn.cursor() as cur:
try:
cur.execute(query, params)
rows = cur.fetchall()
# The pitfall here is memory usage. If the number of spans is huge,
# this fetch could exhaust memory. A production system would need to
# process this in smaller chunks or streams.
return [row[0] for row in rows]
except Exception as e:
logging.error(f"Error fetching data: {e}")
conn.rollback()
return []
def analyze_latencies_with_numpy(durations_micros):
"""
Performs statistical analysis on a list of latencies using NumPy.
"""
if not durations_micros:
logging.warning("No duration data to analyze.")
return
# The core of the performance gain is here: converting to a NumPy array.
# All subsequent operations are highly optimized C/Fortran routines.
latencies = np.array(durations_micros, dtype=np.float64)
latencies_ms = latencies / 1000.0 # Convert to milliseconds for readability
if len(latencies_ms) < 10:
logging.warning(f"Not enough data points ({len(latencies_ms)}) for meaningful analysis.")
return
# --- Basic Statistics ---
p50 = np.percentile(latencies_ms, 50)
p90 = np.percentile(latencies_ms, 90)
p99 = np.percentile(latencies_ms, 99)
p99_9 = np.percentile(latencies_ms, 99.9)
avg = np.mean(latencies_ms)
std_dev = np.std(latencies_ms)
logging.info(f"Analysis for '{TARGET_SERVICE}:{TARGET_OPERATION}'")
logging.info(f" Count: {len(latencies_ms)}")
logging.info(f" Avg: {avg:.2f}ms, StdDev: {std_dev:.2f}ms")
logging.info(f" Percentiles: p50={p50:.2f}ms, p90={p90:.2f}ms, p99={p99:.2f}ms, p99.9={p99_9:.2f}ms")
# --- Advanced Analysis: Distribution Shape and Outliers ---
# 1. Histogram to understand distribution shape (e.g., is it bimodal?)
# A bimodal distribution could indicate a problem with a subset of your servers
# or a specific code path being dramatically slower.
hist, bin_edges = np.histogram(latencies_ms, bins=10)
logging.info(" Latency Distribution Histogram:")
for i in range(len(hist)):
logging.info(f" {bin_edges[i]:.2f}ms - {bin_edges[i+1]:.2f}ms: {hist[i]} spans")
# 2. Outlier detection using Z-score
# This can find individual requests that were abnormally slow compared to the norm.
if std_dev > 0:
z_scores = np.abs((latencies_ms - avg) / std_dev)
outliers = latencies_ms[z_scores > Z_SCORE_THRESHOLD]
if len(outliers) > 0:
logging.warning(f" Detected {len(outliers)} outliers (Z-score > {Z_SCORE_THRESHOLD}).")
logging.warning(f" Max outlier latency: {np.max(outliers):.2f}ms")
else:
logging.info(" No significant outliers detected.")
def main_loop():
"""The main execution loop for the analysis worker."""
conn = connect_db()
if not conn:
return # Exit if DB connection fails on startup
while True:
logging.info("--- Starting new analysis cycle ---")
try:
if conn.closed:
logging.warning("Database connection was closed. Reconnecting...")
conn = connect_db()
if not conn:
time.sleep(ANALYSIS_INTERVAL_SECONDS)
continue
durations = fetch_latency_data(conn, TARGET_SERVICE, TARGET_OPERATION, ANALYSIS_WINDOW_MINUTES)
analyze_latencies_with_numpy(durations)
except Exception as e:
logging.error(f"An unexpected error occurred in the main loop: {e}")
if conn:
conn.close() # Force reconnection on next cycle
time.sleep(ANALYSIS_INTERVAL_SECONDS)
if __name__ == "__main__":
main_loop()
This script demonstrates the power of combining a SQL database with NumPy. The query fetches raw data efficiently, and then NumPy takes over to compute not just simple percentiles but also a histogram of the distribution and identifies outliers using Z-scores. This type of analysis directly addresses our initial problem: it allows us to see the shape of the performance profile, which is far more revealing than a single percentile number.
Architectural Overview and Deployment
To visualize how these components interact, we can use a simple diagram.
graph TD A[Application with Jaeger Client] -- gRPC --> B(Jaeger Agent); B -- gRPC --> C{Custom Go Collector}; C -- pgx/COPY --> D[(CockroachDB Cluster)]; E{Python/NumPy Worker} -- SQL Query --> D; E -- Analysis Results --> F((Alerting/Dashboarding));
A sample docker-compose.yml
file would be essential for local development and testing, allowing a developer to spin up the entire stack with a single command.
version: '3.8'
services:
cockroachdb:
image: cockroachdb/cockroach:v23.1.9
command: start-single-node --insecure --store=type=mem,size=512MiB
ports:
- "26257:26257"
- "8080:8080"
healthcheck:
test: ["CMD", "cockroach", "sql", "--insecure", "-e", "SELECT 1"]
interval: 5s
timeout: 2s
retries: 5
# A simple setup job to create our database and tables.
db-init:
image: cockroachdb/cockroach:v23.1.9
depends_on:
cockroachdb:
condition: service_healthy
command: >
/bin/bash -c "
cockroach sql --insecure -h cockroachdb -e 'CREATE DATABASE IF NOT EXISTS jaeger_analysis;' &&
cockroach sql --insecure -h cockroachdb -d jaeger_analysis -f /schema.sql
"
volumes:
- ./schema.sql:/schema.sql # Mount the SQL file with CREATE TABLE statements
collector:
build:
context: ./collector # Path to the Go collector's source and Dockerfile
ports:
- "14250:14250"
depends_on:
db-init:
condition: service_completed_successfully
environment:
- DATABASE_URL=postgresql://root@cockroachdb:26257/jaeger_analysis?sslmode=disable
analysis-worker:
build:
context: ./analyzer # Path to the Python worker's source and Dockerfile
depends_on:
db-init:
condition: service_completed_successfully
environment:
- DATABASE_URL=postgresql://root@cockroachdb:26257/jaeger_analysis?sslmode=disable
This setup provides a fully self-contained environment to validate the entire pipeline, from trace generation to analysis.
This architecture is not without its limitations. The analysis worker is currently polling the database, which is inefficient. A more robust solution would leverage CockroachDB’s Change Data Capture (CDC) feature to create a stream of new span data, allowing the analysis to be truly real-time. The outlier detection is rudimentary; more sophisticated models could be employed to reduce false positives and detect more subtle anomalies. Furthermore, data lifecycle management is a critical production concern not addressed here; a background job to archive or delete trace data from CockroachDB after a certain retention period would be necessary to control storage costs and maintain performance.