The data ingestion pipeline was a black box. On a good day, it processed tens of thousands of events per second. On a bad day, the RabbitMQ queues would start backing up, alerts would fire, and we’d begin the painful, manual process of diagnostics. The system is composed of a web-facing producer service, a RabbitMQ cluster for decoupling, a fleet of consumer services, a Memcached layer for data enrichment, and an HBase cluster for final persistence. When a slowdown occurred, the question was always the same: where is the bottleneck? Is it a network issue talking to RabbitMQ? Is the consumer logic itself slow? Is Memcached experiencing high latency? Or is HBase struggling with compactions, causing writes to stall? Grepping through logs across dozens of consumer instances to manually piece together the journey of a single event was inefficient and, under pressure, nearly impossible.
Our initial concept was to achieve true end-to-end visibility. We needed to be able to pick any single event and see its entire lifecycle as a single, continuous trace, from the initial API call at the producer, across the asynchronous boundary of RabbitMQ, through the consumer’s enrichment logic against Memcached, and finally to the put
operation in HBase. We had already invested in SkyWalking for our synchronous RPC calls, and its agent-based, low-instrumentation approach was appealing. The critical unknown was how well it could handle context propagation across a message bus, which is the classic weak point for many tracing solutions. A common mistake is to assume that simply adding an APM agent to each service will magically connect asynchronous workflows; in reality, the trace context must be explicitly carried with the message.
The technology stack was already set in stone due to existing infrastructure investments. RabbitMQ provided the robust message queuing we needed. HBase was our choice for a write-intensive, massive-scale data store. Memcached served as a simple, high-speed lookup cache for metadata. The challenge wasn’t selecting new tools, but making the existing ones transparent. Our goal was to leverage SkyWalking’s automatic instrumentation plugins as much as possible to avoid littering our business logic with tracing-specific code. The primary focus would be on validating and, if necessary, customizing the trace context propagation through RabbitMQ’s AMQP protocol.
Let’s start with a baseline project structure that represents our pipeline. We’ll use Spring Boot for simplicity, which integrates well with all the components involved.
The pom.xml
lays the foundation, pulling in the necessary dependencies. In a real-world project, you’d manage these versions carefully.
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/>
</parent>
<groupId>com.pipeline.diagnostics</groupId>
<artifactId>observable-pipeline</artifactId>
<version>1.0.0</version>
<name>observable-pipeline</name>
<description>Demonstrating end-to-end tracing</description>
<properties>
<java.version>11</java.version>
<phoenix.version>5.1.2</phoenix.version>
<jedis.version>3.9.0</jedis.version>
<guava.version>31.1-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- HBase communication via Phoenix JDBC driver -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- Memcached client -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Our initial, untraced producer service has a simple REST endpoint to simulate receiving an event.
ProducerController.java:
package com.pipeline.diagnostics.producer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
@Slf4j
@AllArgsConstructor
public class ProducerController {
private final RabbitTemplate rabbitTemplate;
private static final String EXCHANGE_NAME = "events.exchange";
private static final String ROUTING_KEY = "events.key";
@PostMapping("/event")
public String createEvent(@RequestBody String eventPayload) {
String eventId = UUID.randomUUID().toString();
String message = eventId + ":" + eventPayload;
// In the untraced state, no context is passed.
// SkyWalking will see this as the end of a trace.
log.info("Publishing event: {}", eventId);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);
return "Event " + eventId + " published.";
}
}
The consumer service listens to the queue, performs a couple of I/O operations, and is where the real work happens.
EventConsumer.java:
package com.pipeline.diagnostics.consumer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@AllArgsConstructor
public class EventConsumer {
private final EnrichmentService enrichmentService;
private final PersistenceService persistenceService;
private static final String QUEUE_NAME = "events.queue";
@RabbitListener(queues = QUEUE_NAME)
public void handleEvent(String message) {
String[] parts = message.split(":", 2);
String eventId = parts[0];
String payload = parts.length > 1 ? parts[1] : "";
// SkyWalking will start a *new* trace here, disconnected from the producer.
log.info("Received event for processing: {}", eventId);
try {
// Step 1: Enrich data from a cache
String enrichedData = enrichmentService.fetchMetadata(eventId);
String finalPayload = payload + "::" + enrichedData;
// Step 2: Persist to a long-term store
persistenceService.saveToHBase(eventId, finalPayload);
log.info("Successfully processed event: {}", eventId);
} catch (Exception e) {
log.error("Failed to process event {}: {}", eventId, e.getMessage());
// In a real system, this would go to a dead-letter queue.
}
}
}
The supporting services simulate calls to Memcached and HBase.
EnrichmentService.java (using Jedis for Memcached):
package com.pipeline.diagnostics.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
@Service
@Slf4j
public class EnrichmentService {
// A production setup would use a proper JedisPool bean.
private final JedisPool jedisPool = new JedisPool("localhost", 11211);
public String fetchMetadata(String key) {
// This network call is a potential bottleneck.
try (Jedis jedis = jedisPool.getResource()) {
String cachedValue = jedis.get("metadata:" + key);
if (cachedValue != null) {
log.info("Cache hit for key: {}", key);
return cachedValue;
} else {
log.warn("Cache miss for key: {}", key);
// Simulate fetching from a slower source and caching it.
String fetchedValue = "metadata_from_source_" + System.currentTimeMillis();
jedis.setex("metadata:" + key, 300, fetchedValue); // Cache for 5 minutes
return fetchedValue;
}
}
}
}
PersistenceService.java (using Phoenix JDBC for HBase):
package com.pipeline.diagnostics.consumer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@AllArgsConstructor
public class PersistenceService {
private final JdbcTemplate jdbcTemplate;
// The table must exist in HBase:
// CREATE TABLE IF NOT EXISTS events (
// event_id VARCHAR PRIMARY KEY,
// payload VARCHAR
// );
private static final String UPSERT_SQL = "UPSERT INTO events (event_id, payload) VALUES (?, ?)";
public void saveToHBase(String eventId, String payload) {
log.info("Persisting event {} to HBase", eventId);
// This is another critical I/O operation to monitor.
int rows = jdbcTemplate.update(UPSERT_SQL, eventId, payload);
if (rows == 0) {
log.warn("HBase upsert did not affect any rows for eventId: {}", eventId);
}
}
}
Without SkyWalking, running this system gives us isolated logs. If a consumer is slow, we can’t tell if it’s fetchMetadata
or saveToHBase
without adding manual timing logic, which is clutter.
When we attach the SkyWalking agent to both the producer and consumer JVMs using the -javaagent
flag, we see an immediate improvement.
# Example of running the consumer with the agent
java -javaagent:/path/to/skywalking-agent/skywalking-agent.jar \
-Dskywalking.agent.service_name=pipeline-consumer \
-Dskywalking.collector.backend_service=127.0.0.1:11800 \
-jar consumer-app.jar
In the SkyWalking UI, we would now see two separate traces:
- A trace starting at the
/event
endpoint in thepipeline-producer
service. This trace ends when the message is sent to RabbitMQ. - A second, unrelated trace starting in the
pipeline-consumer
service when@RabbitListener
picks up the message. This trace would correctly show child spans for the Jedis call to Memcached and the JDBC call to HBase, as the respective plugins for those libraries are quite mature.
The gap, the broken context, is the RabbitMQ message transfer. This is the core problem we set out to solve.
The pitfall here is trying to solve this manually. One could use TraceContext.traceId()
to get the trace ID, add it to the message headers, and then attempt to reconstruct the context on the consumer side. This is brittle, ignores span hierarchy, and fights against the agent’s instrumentation.
The correct approach is to rely on SkyWalking’s apm-rabbitmq-5.x-plugin
. This plugin is designed to automatically handle context propagation. On the producer side, it intercepts calls made via Spring’s RabbitTemplate
and injects a special header, sw8
, into the message properties. This header contains the full distributed tracing context: trace ID, parent span ID, service identifiers, etc.
On the consumer side, the plugin wraps the message listener. Before our handleEvent
method is invoked, the plugin inspects the incoming message headers for sw8
. If found, it restores the tracing context into the consumer’s thread. This action links the new consumer-side spans to the original producer-side trace, creating a single, cohesive timeline.
The most pragmatic aspect of this solution is that our application code does not need to change at all. The producer and consumer code shown earlier is already compatible. The magic happens entirely within the agent.
After confirming the agents are attached to both services, a new request to the /event
endpoint generates a complete trace. A visual representation of this trace would look like this in Mermaid.js syntax:
sequenceDiagram participant Client participant Producer Service participant RabbitMQ participant Consumer Service participant Memcached participant HBase Client->>+Producer Service: POST /event Producer Service->>+RabbitMQ: convertAndSend(message) Note over Producer Service,RabbitMQ: SkyWalking agent injects 'sw8' header RabbitMQ-->>-Producer Service: Ack Producer Service-->>-Client: 200 OK RabbitMQ->>+Consumer Service: Deliver Message Note over Consumer Service: Agent reads 'sw8' header, continues trace Consumer Service->>+Memcached: GET metadata:key Memcached-->>-Consumer Service: metadata Consumer Service->>+HBase: UPSERT INTO events HBase-->>-Consumer Service: Success Consumer Service-->>-RabbitMQ: Ack
In the SkyWalking UI, we would now see a single trace. The root span is the POST /event
request. It would have a child “exit” span of type RabbitMQProducer
. Critically, the RabbitMQConsumer
span in the consumer service would appear as a child of the producer’s span, correctly showing the asynchronous link. Subsequent calls to Memcached (type Jedis
) and HBase (type Phoenix
, recognized as JDBC
) would be nested under the consumer span.
Now, we have actionable data. A slow HBase put
operation will appear as an abnormally long JDBC span. A cache miss in Memcached will be visible. We can set alerts based on the duration of specific spans. We finally broke open the black box.
For a production-grade implementation, a few more details are critical.
First, robust configuration management for both the application and the SkyWalking agent is necessary.
Consumer application.yml
:
spring:
application:
name: pipeline-consumer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# Phoenix/HBase Datasource Configuration
datasource:
url: jdbc:phoenix:zk-node1,zk-node2:2181:/hbase
driver-class-name: org.apache.phoenix.jdbc.PhoenixDriver
# Connection pool settings are vital for performance
hikari:
connection-timeout: 30000
maximum-pool-size: 20
# RabbitMQ Declarations
# In a real app, use a @Configuration class for this
rabbitmq:
queue: events.queue
exchange: events.exchange
routingkey: events.key
Second, proper error handling within the consumer becomes observable. Let’s modify the consumer to demonstrate how a failure appears in the trace.
Revised EventConsumer.java with Error Handling:
package com.pipeline.diagnostics.consumer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@AllArgsConstructor
public class EventConsumer {
private final EnrichmentService enrichmentService;
private final PersistenceService persistenceService;
private static final String QUEUE_NAME = "events.queue";
@RabbitListener(queues = QUEUE_NAME)
public void handleEvent(String message) {
// We can now programmatically access the trace ID for logging correlation.
String traceId = TraceContext.traceId();
String[] parts = message.split(":", 2);
String eventId = parts[0];
String payload = parts.length > 1 ? parts[1] : "";
log.info("[TID: {}] Received event for processing: {}", traceId, eventId);
try {
String enrichedData = enrichmentService.fetchMetadata(eventId);
String finalPayload = payload + "::" + enrichedData;
// Let's simulate a failure condition.
if (payload.contains("fail_hbase")) {
throw new IllegalStateException("Simulated HBase connection failure.");
}
persistenceService.saveToHBase(eventId, finalPayload);
log.info("[TID: {}] Successfully processed event: {}", traceId, eventId);
} catch (Exception e) {
// This tag makes it easy to search for failed traces in SkyWalking.
org.apache.skywalking.apm.toolkit.trace.Tags.ERROR.set(Boolean.TRUE.toString());
org.apache.skywalking.apm.toolkit.trace.ActiveSpan.get().log(e);
log.error("[TID: {}] Failed to process event {}: {}", traceId, eventId, e.getMessage());
// Important: Re-throw the exception to trigger Spring AMQP's retry/DLQ mechanism.
// If you catch and swallow it, the message will be considered 'acked'.
throw new RuntimeException("Processing failed for event " + eventId, e);
}
}
}
In this revised version, if a message payload contains "fail_hbase"
, an exception is thrown. The SkyWalking agent automatically marks the span as failed. By using the apm-toolkit-trace
dependency, we can explicitly add error tags and logs to the span, providing even richer diagnostic information directly within the SkyWalking UI. The log message now also includes the trace ID, allowing for a seamless jump from a log entry to the full distributed trace.
Testing this kind of instrumented system requires a focus on integration testing. Unit tests can verify the business logic of EnrichmentService
or PersistenceService
using mocks, but to validate the pipeline flow, a test environment with live RabbitMQ, Memcached, and HBase instances (or test containers) is necessary. You would then trigger the producer’s endpoint and assert that the data appears correctly in HBase, verifying the end-to-end functionality. Verifying the trace itself is typically done manually in the SkyWalking UI during development or with a dedicated QA environment.
The solution relies heavily on the quality and coverage of SkyWalking’s plugins. If our pipeline were to include a component for which no automatic instrumentation plugin exists—for example, a custom binary RPC protocol or an esoteric database—we would be forced to fall back to manual context propagation using the SkyWalking toolkit APIs. This would involve extracting the context (ContextCarrier
) on the client side, serializing it, passing it with the request, and then de-serializing and continuing the context (ContextManager.createEntrySpan
) on the server side. This adds significant code complexity and maintenance overhead, which we successfully avoided here.
Furthermore, the volume of trace data generated in a high-throughput system is non-trivial. The SkyWalking agent’s default sampling rate might be too high for production, creating unnecessary load on both the application network and the SkyWalking OAP backend. A critical next step for a real-world deployment is to analyze the performance impact of the agent and tune the sampling rate (agent.sample_n_per_3_secs
) to a level that provides sufficient diagnostic data during incidents without imposing a constant performance tax. This trade-off between visibility and overhead is a fundamental challenge in any large-scale observability implementation.