Our SolrCloud deployment was a black box during performance spikes. JMX metrics showed high CPU or I/O wait, but couldn’t attribute it to specific queries. Logs were too high-level, providing only total query time, which obscured the root cause—was it GC, disk contention, or a CPU-bound facet calculation? Modifying the Solr source code to add detailed instrumentation was off the table due to maintenance overhead and vendor support constraints. We needed a zero-instrumentation solution to dissect query performance at the kernel level, in real-time, across the entire cluster.
The core challenge was to correlate low-level kernel activity, like file I/O system calls, back to a specific, high-level Solr query without altering the application. Our initial concept was to leverage eBPF to trace the syscalls of the Solr JVM process. The raw event data would be immense, so a lightweight, high-throughput messaging layer was necessary to ship it off-host for analysis. ZeroMQ, with its brokerless architecture, was a clear contender over heavier systems like Kafka for this raw telemetry transport.
The architecture crystallized into three components:
- An eBPF probe attached to each Solr node’s JVM, capturing syscalls like
read
andwrite
. - A local userspace agent on each node to read data from the eBPF perf buffer and publish it via a ZeroMQ
PUSH
socket. - A central aggregator service that consumes data from all nodes via a ZeroMQ
PULL
socket, correlates events, enriches the data, and indexes it into a separate, dedicated Solr instance for analysis.
graph TD subgraph Solr Node 1 A[Solr JVM] -->|syscalls| B(eBPF kprobes); B --> C{BPF Perf Buffer}; D[Userspace Agent] -->|reads| C; D -->|ZMQ PUSH| E(ZMQ Socket); end subgraph Solr Node 2 A2[Solr JVM] -->|syscalls| B2(eBPF kprobes); B2 --> C2{BPF Perf Buffer}; D2[Userspace Agent] -->|reads| C2; D2 -->|ZMQ PUSH| E2(ZMQ Socket); end subgraph Central Aggregator Host F[Aggregator Service] -->|ZMQ PULL| E; F -->|ZMQ PULL| E2; F -->|Indexes Data| G[Analysis Solr Instance]; H[Analyst/Dashboard] -->|Queries| G; end
A significant hurdle in a real-world project is that simply tracing syscalls is not enough. We need context. A read
syscall event from the kernel only tells us the process ID (PID) and thread ID (TID), but not which Solr query triggered it. Our solution was to trace the recvfrom
and sendto
syscalls to capture the entry and exit points of a request on a specific thread. By tracking the TID, we could tag all subsequent I/O events on that same thread as belonging to that request until the response was sent. This is a heuristic, but in practice, Solr’s thread-per-request model makes this correlation reliable enough for actionable insights.
Phase 1: The eBPF Kernel Probe
We used BCC (BPF Compiler Collection) to write the eBPF program in C. The program uses kprobes to trace __x64_sys_read
and __x64_sys_writev
functions. A common mistake is to only trace the entry point; however, to measure duration, we need both entry (kprobe
) and exit (kretprobe
). We store the entry timestamp in a BPF hash map, keyed by the thread ID. Upon exit, we retrieve the start time, calculate the duration, and push the result to the perf buffer for the userspace agent.
Here’s the core C code for the eBPF program. It’s not a “hello world” example; it handles potential errors and captures rich context.
solr_trace.c
:
#include <uapi/linux/ptrace.h>
#include <linux/fs.h>
#include <linux/sched.h>
// Data structure to be sent to userspace
struct data_t {
u64 ts;
u64 duration_ns;
u32 pid;
u32 tid;
u64 bytes;
char comm[TASK_COMM_LEN];
char syscall_type; // 'R' for read, 'W' for write
};
// BPF map to store entry timestamps, keyed by thread ID
BPF_HASH(start, u32, u64);
// BPF perf buffer to send data to userspace
BPF_PERF_OUTPUT(events);
// Trace read entry
int trace_read_entry(struct pt_regs *ctx, int fd, void *buf, size_t count) {
u32 tid = bpf_get_current_pid_tgid();
u64 ts = bpf_ktime_get_ns();
start.update(&tid, &ts);
return 0;
}
// Trace read exit
int trace_read_exit(struct pt_regs *ctx) {
u32 tid = bpf_get_current_pid_tgid();
u64 *tsp = start.lookup(&tid);
if (tsp == 0) {
// Missed the entry probe, common if tracing was started mid-syscall.
// In a real-world project, we ignore these events to avoid bad data.
return 0;
}
struct data_t data = {};
data.ts = bpf_ktime_get_ns();
data.duration_ns = data.ts - *tsp;
data.pid = bpf_get_current_pid_tgid() >> 32;
data.tid = tid;
data.bytes = PT_REGS_RC(ctx);
data.syscall_type = 'R';
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
start.delete(&tid);
return 0;
}
// Trace writev entry
int trace_writev_entry(struct pt_regs *ctx, unsigned long fd, const struct iovec __user *vec, unsigned long vlen) {
u32 tid = bpf_get_current_pid_tgid();
u64 ts = bpf_ktime_get_ns();
start.update(&tid, &ts);
return 0;
}
// Trace writev exit
int trace_writev_exit(struct pt_regs *ctx) {
u32 tid = bpf_get_current_pid_tgid();
u64 *tsp = start.lookup(&tid);
if (tsp == 0) {
return 0; // Ignore if entry was missed
}
struct data_t data = {};
data.ts = bpf_ktime_get_ns();
data.duration_ns = data.ts - *tsp;
data.pid = bpf_get_current_pid_tgid() >> 32;
data.tid = tid;
data.bytes = PT_REGS_RC(ctx);
data.syscall_type = 'W';
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
start.delete(&tid);
return 0;
}
This code is focused and robust. It correctly uses bpf_ktime_get_ns
for monotonic time, handles cases where an entry probe might be missed, and populates a well-defined struct to send to userspace. The comm
field is critical for filtering events to only the java
process running Solr.
Phase 2: The Userspace Agent in Python
The agent’s role is to load the eBPF C code, attach the probes, listen on the perf buffer, and forward processed events to the central aggregator via ZeroMQ. We chose Python with the bcc
library for rapid development.
A critical design choice here is what to send over the network. Sending every single syscall event would create a massive traffic storm. The pitfall here is network saturation. A better approach is to perform micro-batching and aggregation within the agent. The agent can buffer events for a short period (e.g., 200ms) and send a summary for each thread, reducing network chatter significantly.
agent.py
:
import zmq
import socket
import time
import logging
from bcc import BPF
# Configure logging for production use
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class SolrTraceAgent:
def __init__(self, target_pid, zmq_endpoint):
self.target_pid = target_pid
self.hostname = socket.gethostname()
# ZeroMQ setup
try:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUSH)
self.socket.connect(zmq_endpoint)
logging.info(f"ZMQ PUSH socket connected to {zmq_endpoint}")
except zmq.ZMQError as e:
logging.error(f"Failed to initialize ZeroMQ socket: {e}")
raise
# Load and configure eBPF program
try:
with open('solr_trace.c', 'r') as f:
bpf_text = f.read()
self.bpf = BPF(text=bpf_text)
self.bpf.attach_kprobe(event="__x64_sys_read", fn_name="trace_read_entry")
self.bpf.attach_kretprobe(event="__x64_sys_read", fn_name="trace_read_exit")
self.bpf.attach_kprobe(event="__x64_sys_writev", fn_name="trace_writev_entry")
self.bpf.attach_kretprobe(event="__x64_sys_writev", fn_name="trace_writev_exit")
logging.info("eBPF probes attached successfully.")
except Exception as e:
logging.error(f"Failed to initialize eBPF probes: {e}")
raise
def process_event(self, cpu, data, size):
"""Callback for processing events from the BPF perf buffer."""
event = self.bpf["events"].event(data)
# Filter for the target Solr PID
if event.pid != self.target_pid:
return
# In a real-world scenario, you'd buffer these and aggregate before sending.
# For this example, we send them directly but structured.
payload = {
"hostname": self.hostname,
"ts": event.ts,
"pid": event.pid,
"tid": event.tid,
"comm": event.comm.decode('utf-8', 'replace'),
"syscall": chr(event.syscall_type),
"duration_ns": event.duration_ns,
"bytes": event.bytes
}
try:
self.socket.send_json(payload, zmq.NOBLOCK)
except zmq.Again:
# This happens if the high water mark is reached.
# A production agent should have a strategy for this: drop, or block.
# We'll log and drop for this fire-and-forget telemetry.
logging.warning("ZMQ high water mark reached, dropping event.")
except Exception as e:
logging.error(f"Error sending data via ZMQ: {e}")
def run(self):
"""Main loop to poll the perf buffer."""
logging.info(f"Starting agent for PID {self.target_pid}...")
self.bpf["events"].open_perf_buffer(self.process_event)
while True:
try:
self.bpf.perf_buffer_poll()
except KeyboardInterrupt:
logging.info("Shutting down agent.")
break
except Exception as e:
logging.error(f"Error during perf buffer poll: {e}")
# In production, you might want a backoff-retry mechanism here.
time.sleep(1)
# Cleanup
self.socket.close()
self.context.term()
if __name__ == "__main__":
# In a real system, the PID would be discovered dynamically.
# e.g., `pgrep -f "start.jar"`
SOLR_PID = 12345
ZMQ_AGGREGATOR_ENDPOINT = "tcp://10.0.0.100:5558"
if SOLR_PID == 12345:
logging.error("Please set the correct Solr PID in the script.")
else:
agent = SolrTraceAgent(target_pid=SOLR_PID, zmq_endpoint=ZMQ_AGGREGATOR_ENDPOINT)
agent.run()
This agent code includes basic production readiness: logging, error handling for ZMQ and BPF initialization, and a non-blocking send with awareness of backpressure (zmq.Again
). The hardcoded PID is a placeholder; a production script would discover this dynamically.
Phase 3: The Central Aggregator and Solr Indexer
The aggregator is the brains of the operation. It listens on a ZeroMQ PULL
socket, receiving JSON messages from all agents. Its primary job is to perform stateful correlation. It maintains an in-memory map of thread IDs (tid
) to aggregate I/O statistics. A common mistake is to process events stateless-ly, which loses the opportunity to build a complete picture of a thread’s activity over its lifetime.
aggregator.py
:
import zmq
import json
import logging
import time
from collections import defaultdict, deque
import pysolr
# Configuration
ZMQ_BIND_ADDRESS = "tcp://*:5558"
SOLR_ANALYSIS_URL = "http://localhost:8984/solr/query_perf_metrics"
FLUSH_INTERVAL_SECONDS = 5
THREAD_TIMEOUT_SECONDS = 30 # Time after which we consider a thread's work complete
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Aggregator:
def __init__(self):
# ZMQ setup
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
self.socket.bind(ZMQ_BIND_ADDRESS)
logging.info(f"Aggregator listening on {ZMQ_BIND_ADDRESS}")
# Solr setup
try:
self.solr = pysolr.Solr(SOLR_ANALYSIS_URL, always_commit=False, timeout=10)
# A simple ping to check connection on startup
self.solr.ping()
logging.info(f"Connected to Solr at {SOLR_ANALYSIS_URL}")
except Exception as e:
logging.error(f"Failed to connect to Solr: {e}")
raise
# In-memory state for aggregation
# Key: (hostname, pid, tid)
# Value: { "total_io_ns": ..., "total_read_bytes": ..., "last_seen": ... }
self.thread_stats = defaultdict(lambda: defaultdict(int))
self.documents_to_flush = []
def process_message(self, msg):
try:
data = json.loads(msg)
key = (data['hostname'], data['pid'], data['tid'])
stats = self.thread_stats[key]
stats['last_seen'] = time.time()
stats['total_io_ns'] += data['duration_ns']
if data['syscall'] == 'R':
stats['total_read_ns'] += data['duration_ns']
stats['total_read_bytes'] += data['bytes']
stats['read_ops'] += 1
elif data['syscall'] == 'W':
stats['total_write_ns'] += data['duration_ns']
stats['total_write_bytes'] += data['bytes']
stats['write_ops'] += 1
except (json.JSONDecodeError, KeyError) as e:
logging.warning(f"Failed to process message: {msg}, error: {e}")
def flush_stale_threads(self):
"""Finds timed-out threads, converts their stats to Solr docs, and clears them."""
now = time.time()
stale_keys = []
for key, stats in self.thread_stats.items():
if now - stats['last_seen'] > THREAD_TIMEOUT_SECONDS:
stale_keys.append(key)
hostname, pid, tid = key
doc = {
"id": f"{hostname}-{pid}-{tid}-{stats['last_seen']}",
"hostname_s": hostname,
"pid_i": pid,
"tid_i": tid,
"total_io_duration_ms_f": stats['total_io_ns'] / 1e6,
"read_duration_ms_f": stats['total_read_ns'] / 1e6,
"write_duration_ms_f": stats['total_write_ns'] / 1e6,
"read_bytes_l": stats['total_read_bytes'],
"write_bytes_l": stats['total_write_bytes'],
"read_ops_i": stats['read_ops'],
"write_ops_i": stats['write_ops'],
"event_timestamp_dt": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(stats['last_seen']))
}
self.documents_to_flush.append(doc)
for key in stale_keys:
del self.thread_stats[key]
if self.documents_to_flush:
logging.info(f"Flushing {len(self.documents_to_flush)} documents to Solr.")
try:
self.solr.add(self.documents_to_flush)
self.solr.commit() # In production, a soft commit might be better
self.documents_to_flush.clear()
except Exception as e:
logging.error(f"Failed to flush documents to Solr: {e}")
def run(self):
last_flush_time = time.time()
while True:
try:
# Poll with a timeout so we can periodically run the flush logic
if self.socket.poll(timeout=1000): # 1 second timeout
msg = self.socket.recv_string()
self.process_message(msg)
if time.time() - last_flush_time > FLUSH_INTERVAL_SECONDS:
self.flush_stale_threads()
last_flush_time = time.time()
except KeyboardInterrupt:
logging.info("Shutting down aggregator.")
break
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
time.sleep(1)
self.socket.close()
self.context.term()
if __name__ == "__main__":
aggregator = Aggregator()
aggregator.run()
This aggregator implements a crucial pattern: stateful stream processing with time-based windowing. It aggregates metrics for each thread and only “flushes” a thread’s final statistics to Solr after a period of inactivity (THREAD_TIMEOUT_SECONDS
). This converts a high-frequency stream of syscall events into a much lower-frequency stream of meaningful, aggregated documents representing the total I/O work done by a single thread for a single request.
The Solr schema for the query_perf_metrics
core would look something like this (using Schema API syntax):
{
"add-field": [
{"name":"id", "type":"string", "indexed":true, "stored":true, "required":true, "multiValued":false},
{"name":"hostname_s", "type":"string", "indexed":true, "stored":true},
{"name":"pid_i", "type":"int", "indexed":true, "stored":true},
{"name":"tid_i", "type":"int", "indexed":true, "stored":true},
{"name":"total_io_duration_ms_f", "type":"pfloat", "indexed":true, "stored":true},
{"name":"read_duration_ms_f", "type":"pfloat", "indexed":true, "stored":true},
{"name":"write_duration_ms_f", "type":"pfloat", "indexed":true, "stored":true},
{"name":"read_bytes_l", "type":"plong", "indexed":true, "stored":true},
{"name":"write_bytes_l", "type":"plong", "indexed":true, "stored":true},
{"name":"read_ops_i", "type":"int", "indexed":true, "stored":true},
{"name":"write_ops_i", "type":"int", "indexed":true, "stored":true},
{"name":"event_timestamp_dt", "type":"pdate", "indexed":true, "stored":true}
]
}
With this data indexed, we can now ask powerful questions of our observability system, like:
-
q=total_io_duration_ms_f:[1000 TO *]
- Find all requests that spent more than 1 second in I/O. -
q=*:*&stats=true&stats.field=total_io_duration_ms_f&stats.facet=hostname_s
- Get I/O duration statistics faceted by host to find noisy neighbors.
The current implementation successfully provides kernel-level I/O visibility per thread, which is a massive leap forward from generic JMX metrics. However, it operates on a significant heuristic: that a single thread handles one request from start to finish. This holds true for Solr’s Jetty container under most conditions but can break down with asynchronous request processing or complex thread pool hand-offs. Furthermore, this system is blind to other performance killers like CPU-bound loops or GC pauses, as it only traces I/O syscalls.
A future iteration could improve correlation by tracing network syscalls (recvfrom
, sendto
) to definitively mark the start and end of a request on a given thread, capturing the request parameters in the process. Integrating USDT (User-level Statically Defined Tracing) probes, if they were ever added to the JVM or Solr, would provide a far more robust way to get application-level context. For now, this eBPF, ZeroMQ, and Solr pipeline provides an 80% solution for a problem that was previously intractable, without ever touching the application code.