The system’s behavior was becoming unpredictable. A user action, processed via an HTTP request through our APISIX gateway to an Echo backend, would trigger an asynchronous job by publishing a message to an Apache Pulsar topic. Days later, a support ticket would arrive reporting that the job never completed. The logs from the consumer service showed no record of the message, while the producer service logs confirmed it was sent. Between the gateway and the final database commit, the request’s context was lost in a sea of disconnected log entries and metric spikes. We were flying blind. The immediate need was not for more logging, but for a narrative thread connecting these disparate events. This led us to adopt OpenTelemetry for end-to-end distributed tracing. The real work was instrumenting our entire, heterogeneous stack: a Dart-based diagnostic tool, Apache APISIX, Go services using the Echo framework, and our Pulsar messaging backbone.
Our initial concept was to enforce a single trace context that would originate from the very first interaction, propagate through every component, and terminate only when the final piece of work was done. This meant the W3C traceparent
header had to survive REST APIs, the APISIX proxy layer, and, most critically, the transition into and out of Pulsar’s message properties.
Technology selection was dictated by our existing stack, but instrumentation choices required careful consideration.
- APISIX: Its
opentelemetry
plugin seemed like a perfect fit. It promised to handle trace creation and propagation at the edge without code changes in the gateway itself. The reality, as we found, is that its configuration needed to be precise to avoid breaking traces or creating orphaned ones. - Echo (Go): The OpenTelemetry Go SDK is mature. The
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho
package provided a middleware that handled most of the HTTP server-side instrumentation. The real challenge was on the client side—specifically, the Pulsar producer client. - Pulsar: This was the heart of the problem. Unlike Kafka, which has well-established instrumentation libraries, the Go Pulsar client’s OpenTelemetry integration was less mature at the time. We concluded we’d have to manually inject and extract the trace context into and from the message properties. This was a critical implementation detail that would make or break the entire effort.
- Dart: We used a Dart command-line tool for internal testing and diagnostics. To validate our tracing setup, this tool needed to be the originator of the trace. We had to find a viable OpenTelemetry SDK for Dart and ensure its HTTP client could correctly inject the
traceparent
header. - GitLab CI/CD: We decided early on that manual verification of traces in a UI like Jaeger was insufficient. A real-world project requires automated validation. Our CI/CD pipeline had to become an orchestration engine that not only deployed the services but also ran an integration test that programmatically verified the integrity of the traces.
The implementation journey was a series of connected steps, each building upon the last to form a complete, observable data path.
Part 1: Configuring APISIX for Trace Propagation
The first step was ensuring APISIX didn’t just pass headers but actively participated in the trace. We configured it to create a span for the request it handled, linking it to the parent span from the Dart client and creating a new child span context for the upstream Echo service.
We defined a route that pointed to our producer service and attached the opentelemetry
plugin. The key was configuring the sampler
. For our diagnostic environment, we used always_on
, but in production, this would be a probabilistic sampler. The additional_attributes
allowed us to tag spans originating from the gateway.
Here is the APISIX route configuration, managed declaratively in our Git repository.
# apisix/conf/routes.yaml
routes:
- id: "1"
uri: "/v1/process"
methods: ["POST"]
upstream:
type: "roundrobin"
nodes:
"echo-producer:8080": 1
plugins:
opentelemetry:
sampler:
name: always_on
endpoint: "http://jaeger:4318/v1/traces" # OTLP HTTP endpoint
protocol: "http"
additional_attributes:
- "service.instance.id=apisix-ingress-1"
- "peer.service=echo-producer"
A common mistake here is misconfiguring the OTLP endpoint or protocol, leading to silent failures where traces are simply dropped. We added specific health checks in our startup scripts to ensure APISIX could reach the Jaeger collector before accepting traffic.
Part 2: Instrumenting the Echo Producer Service
This Go service listens for HTTP requests from APISIX and publishes a message to Pulsar. Instrumentation involved three main tasks: setting up the global OpenTelemetry provider, adding the otelecho
middleware, and manually injecting the trace context into the Pulsar message.
Here’s the core of the producer’s setup.
// cmd/producer/main.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
const (
serviceName = "echo-producer"
pulsarURL = "pulsar://pulsar:6650"
pulsarTopic = "persistent://public/default/job-topic"
jaegerEndpoint = "jaeger:4318"
)
// newTracerProvider creates a new trace provider instance and registers it globally.
func newTracerProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
// Using OTLP HTTP exporter
exporter, err := otlptracehttp.New(ctx, otlptracehttp.WithEndpoint(jaegerEndpoint), otlptracehttp.WithInsecure())
if err != nil {
return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
}
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()), // Use ParentBasedSampler in production
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
type PulsarMessageCarrier map[string]string
func (c PulsarMessageCarrier) Get(key string) string {
return c[key]
}
func (c PulsarMessageCarrier) Set(key string, value string) {
c[key] = value
}
func (c PulsarMessageCarrier) Keys() []string {
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
return keys
}
func main() {
ctx := context.Background()
// Setup OpenTelemetry
tp, err := newTracerProvider(ctx)
if err != nil {
log.Fatalf("failed to initialize tracer provider: %v", err)
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
log.Printf("error shutting down tracer provider: %v", err)
}
}()
// Pulsar Client
pulsarClient, err := pulsar.NewClient(pulsar.ClientOptions{
URL: pulsarURL,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("could not instantiate Pulsar client: %v", err)
}
defer pulsarClient.Close()
producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
Topic: pulsarTopic,
})
if err != nil {
log.Fatalf("could not create Pulsar producer: %v", err)
}
defer producer.Close()
// Echo Server
e := echo.New()
e.Use(middleware.Logger())
e.Use(otelecho.Middleware(serviceName)) // The crucial middleware
// The handler where the magic happens
e.POST("/v1/process", func(c echo.Context) error {
// The otelecho middleware automatically extracts the span from the request context.
// We get the current span for logging and to create child spans.
span := trace.SpanFromContext(c.Request().Context())
defer span.End()
// 1. Log with trace and span IDs
sc := span.SpanContext()
log.Printf("Producer received request. TraceID: %s, SpanID: %s", sc.TraceID().String(), sc.SpanID().String())
// 2. Business Logic
var payload map[string]interface{}
if err := c.Bind(&payload); err != nil {
span.RecordError(err)
span.SetStatus(500, "Invalid payload")
return c.JSON(http.StatusBadRequest, map[string]string{"error": "invalid json payload"})
}
span.SetAttributes(attribute.String("payload.id", fmt.Sprintf("%v", payload["id"])))
// 3. Prepare to send to Pulsar. This is the critical part.
// We create a new span for the Pulsar publish operation.
tracer := otel.Tracer("pulsar-producer-tracer")
producerCtx, producerSpan := tracer.Start(c.Request().Context(), "PulsarProduce")
defer producerSpan.End()
producerSpan.SetAttributes(
semconv.MessagingSystemKey.String("pulsar"),
semconv.MessagingDestinationNameKey.String(pulsarTopic),
)
// 4. Manual context injection into message properties
// Create a carrier and inject the current context into it.
carrier := make(PulsarMessageCarrier)
propagator := otel.GetTextMapPropagator()
propagator.Inject(producerCtx, carrier)
// The properties of the message will now contain 'traceparent' and 'tracestate'.
msg := &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("data for %v", payload["id"])),
Properties: carrier,
}
// 5. Send the message
msgID, err := producer.Send(producerCtx, msg)
if err != nil {
producerSpan.RecordError(err)
producerSpan.SetStatus(500, "Failed to send to Pulsar")
log.Printf("Failed to publish message: %v", err)
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "could not process message"})
}
producerSpan.SetAttributes(attribute.String("messaging.message_id", msgID.String()))
log.Printf("Message sent to Pulsar with ID %s. TraceID: %s, SpanID: %s", msgID.String(), sc.TraceID().String(), producerSpan.SpanContext().SpanID().String())
return c.JSON(http.StatusOK, map[string]string{"status": "processing", "messageId": msgID.String()})
})
if err := e.Start(":8080"); err != nil && err != http.ErrServerClosed {
e.Logger.Fatal(err)
}
}
The PulsarMessageCarrier
type is essential. It implements the propagation.TextMapCarrier
interface, providing a generic way for the OpenTelemetry SDK to inject key-value pairs (traceparent
, tracestate
) into a simple map[string]string
, which we then assign to the Properties
field of the Pulsar message.
Part 3: Instrumenting the Pulsar Consumer Service
The consumer service is a background worker that listens for messages from the topic. Its main task is to extract the trace context from the message properties and start a new span that is correctly parented to the producer’s span. This visually stitches the asynchronous part of the workflow to the initial HTTP request in our tracing backend.
// cmd/consumer/main.go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
const (
serviceName = "pulsar-consumer"
pulsarURL = "pulsar://pulsar:6650"
pulsarTopic = "persistent://public/default/job-topic"
subscription = "my-subscription"
jaegerEndpoint = "jaeger:4318"
)
// newTracerProvider is identical to the producer's setup
func newTracerProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
exporter, err := otlptracehttp.New(ctx, otlptracehttp.WithEndpoint(jaegerEndpoint), otlptracehttp.WithInsecure())
if err != nil {
return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
}
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
// Re-using the same carrier implementation
type PulsarMessageCarrier map[string]string
func (c PulsarMessageCarrier) Get(key string) string {
return c[key]
}
func (c PulsarMessageCarrier) Set(key string, value string) {
c[key] = value
}
func (c PulsarMessageCarrier) Keys() []string {
keys := make([]string, 0, len(c))
for k := range c {
keys = append(keys, k)
}
return keys
}
func main() {
ctx := context.Background()
tp, err := newTracerProvider(ctx)
if err != nil {
log.Fatalf("failed to initialize tracer provider: %v", err)
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
log.Printf("error shutting down tracer provider: %v", err)
}
}()
pulsarClient, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarURL})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer pulsarClient.Close()
consumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: pulsarTopic,
SubscriptionName: subscription,
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
tracer := otel.Tracer("pulsar-consumer-tracer")
propagator := otel.GetTextMapPropagator()
log.Println("Consumer started. Waiting for messages...")
for {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatalf("Consumer receive error: %v", err)
}
// 1. Extract context from message properties
carrier := PulsarMessageCarrier(msg.Properties())
parentCtx := propagator.Extract(context.Background(), carrier)
// 2. Start a new span as a child of the producer's span
consumerCtx, span := tracer.Start(parentCtx, "PulsarConsume")
span.SetAttributes(
semconv.MessagingSystemKey.String("pulsar"),
semconv.MessagingDestinationNameKey.String(pulsarTopic),
attribute.String("messaging.message_id", msg.ID().String()),
)
sc := span.SpanContext()
log.Printf("Consumer received message. TraceID: %s, SpanID: %s", sc.TraceID().String(), sc.SpanID().String())
// 3. Process the message
log.Printf("Processing payload: %s", string(msg.Payload()))
// Simulate work
time.Sleep(100 * time.Millisecond)
// 4. Acknowledge and finish the span
consumer.Ack(msg)
span.End()
log.Printf("Finished processing message. TraceID: %s", sc.TraceID().String())
}
}
Part 4: The Dart Diagnostic Client
To initiate the entire flow, we built a simple Dart CLI tool. Its responsibility is to create the root span of the trace and inject the traceparent
header into the initial HTTP POST request to APISIX.
// bin/run_test.dart
import 'dart:convert';
import 'dart:io';
import 'package:http/http.dart' as http;
import 'package:opentelemetry/api.dart' as api;
import 'package:opentelemetry/sdk.dart' as sdk;
import 'package:opentelemetry/exporter_otlp_http.dart' as otlp;
import 'package:opentelemetry/contrib_resource_detectors_process.dart' as resource_detectors;
// A simple carrier for injecting headers into the http request.
class HttpCarrier implements api.TextMapSetter<http.Request> {
void set(http.Request carrier, String key, String value) {
carrier.headers[key] = value;
}
}
Future<void> main(List<String> arguments) async {
// 1. Configure OpenTelemetry SDK
final resource = sdk.Resource([
...sdk.Resource.defaultResources,
...resource_detectors.processResource,
api.Attribute.fromString(api.ResourceAttributes.serviceName, 'dart-diagnostic-client'),
]);
final exporter = otlp.CollectorExporter(
Uri.parse('http://localhost:4318/v1/traces'), // Assumes running locally
);
final processor = sdk.BatchSpanProcessor(exporter);
final tracerProvider = sdk.TracerProvider(
resource: resource,
processors: [processor],
);
final tracer = tracerProvider.getTracer('my-app-tracer');
// 2. Create the root span for our operation
final span = tracer.startSpan('initiate-processing-job');
try {
// 3. Create the HTTP request and inject the trace context
final url = Uri.parse('http://localhost:9080/v1/process');
final request = http.Request('POST', url)
..headers[HttpHeaders.contentTypeHeader] = 'application/json'
..body = json.encode({'id': 'job-123', 'data': 'some-payload'});
final propagator = api.W3CTraceContextPropagator();
propagator.inject(api.Context.current.withSpan(span), request, HttpCarrier());
final traceparent = request.headers['traceparent'];
print('Sending request with traceparent header: $traceparent');
span.setAttribute(api.Attribute.fromString('http.url', url.toString()));
// 4. Send the request
final client = http.Client();
final response = await client.send(request);
final responseBody = await response.stream.bytesToString();
span.setAttribute(api.Attribute.fromInt('http.status_code', response.statusCode));
print('Response status: ${response.statusCode}');
print('Response body: ${responseBody}');
} catch (e, s) {
span.recordException(e, stackTrace: s);
span.setStatus(api.StatusCode.error, description: 'Request failed');
print('Error: $e');
} finally {
span.end();
// 5. Shutdown the provider to ensure all spans are exported
await tracerProvider.shutdown();
print('Trace sent.');
}
}
Part 5: The GitLab CI/CD Verification Pipeline
This is where everything comes together. The pipeline uses Docker Compose to set up a self-contained environment with Jaeger, Pulsar, APISIX, and our Go services. It then runs the Dart client to generate traffic and, finally, a script queries the Jaeger API to verify that a complete trace was generated.
A complete trace is defined as one that contains spans from dart-diagnostic-client
, apisix
, echo-producer
, and pulsar-consumer
, all sharing the same trace ID.
# .gitlab-ci.yml
stages:
- setup
- test
- verify
- teardown
variables:
# Using docker-in-docker for service management
DOCKER_HOST: tcp://docker:2375
DOCKER_TLS_CERTDIR: ""
# Service needed for Docker commands
default:
services:
- name: docker:dind
alias: docker
build_images:
stage: setup
script:
- echo "Building service images..."
- docker build -t my-echo-producer ./producer
- docker build -t my-pulsar-consumer ./consumer
- docker build -t my-dart-client ./dart-client
# In a real project, these would be pushed to a registry
run_integration_test:
stage: test
script:
- echo "Starting services with Docker Compose..."
# This docker-compose.yml defines jaeger, pulsar, apisix, and our services
- docker-compose up -d
- echo "Waiting for services to be healthy..."
- sleep 30 # Simple wait; a real pipeline would have proper health checks
- echo "Running Dart client to generate trace..."
- docker-compose run --rm dart-client
artifacts:
when: always
paths:
- docker-compose.logs
expire_in: 1 day
after_script:
- docker-compose logs > docker-compose.logs
verify_trace:
stage: verify
needs: ["run_integration_test"]
script:
- apk add --no-cache curl jq
- echo "Querying Jaeger for the trace..."
# Give Jaeger a moment to process the trace
- sleep 10
- >
JAEGER_API="http://docker:16686/api/traces"
SERVICE_NAME="dart-diagnostic-client"
LOOKBACK="5m"
# 1. Get the Trace ID from the root service
TRACE_ID=$(curl -s "$JAEGER_API?service=$SERVICE_NAME&lookback=$LOOKBACK&limit=1" | jq -r '.data[0].traceID')
- |
if [ -z "$TRACE_ID" ] || [ "$TRACE_ID" == "null" ]; then
echo "Error: Could not find any trace for service $SERVICE_NAME"
exit 1
fi
- echo "Found Trace ID: $TRACE_ID"
# 2. Get all spans for that Trace ID
- >
SPAN_SERVICES=$(curl -s "$JAEGER_API/$TRACE_ID" | jq -r '.data[0].processes | .[] | .serviceName')
# 3. Assert that all expected services are present in the trace
- |
echo "Services found in trace:"
echo "$SPAN_SERVICES"
echo "$SPAN_SERVICES" | grep -q "dart-diagnostic-client" || (echo "Missing span from dart-diagnostic-client" && exit 1)
echo "$SPAN_SERVICES" | grep -q "apisix" || (echo "Missing span from apisix" && exit 1)
echo "$SPAN_SERVICES" | grep -q "echo-producer" || (echo "Missing span from echo-producer" && exit 1)
echo "$SPAN_SERVICES" | grep -q "pulsar-consumer" || (echo "Missing span from pulsar-consumer" && exit 1)
- echo "Trace verification successful: All services are present."
cleanup:
stage: teardown
script:
- echo "Cleaning up Docker Compose environment..."
- docker-compose down -v
when: always
This CI pipeline transformed observability from a passive debugging tool into a first-class, testable feature of our system. If a code change broke context propagation, the verify_trace
job would fail, preventing the regression from ever reaching production.
sequenceDiagram participant Dart Client participant GitLab CI participant APISIX participant Echo Producer participant Pulsar participant Echo Consumer participant Jaeger GitLab CI->>Dart Client: Run test command Dart Client->>Jaeger: Start Trace (Span A) Dart Client->>APISIX: POST /v1/process (with traceparent) APISIX->>Jaeger: Create Span B (child of A) APISIX->>Echo Producer: Forward request (with new traceparent) Echo Producer->>Jaeger: Create Span C (child of B) Echo Producer->>Pulsar: Publish Message (with trace context in properties) Pulsar-->>Echo Consumer: Deliver Message Echo Consumer->>Jaeger: Create Span D (child of C, from message properties) Echo Consumer-->>Pulsar: Ack Message GitLab CI->>Jaeger: Query API for Trace Jaeger-->>GitLab CI: Return spans for trace GitLab CI->>GitLab CI: Assert Spans A, B, C, D exist
The solution, while effective, is not without its limitations. Our current implementation traces every single request, which is not feasible in a high-throughput production environment. The next logical step is to implement a sophisticated sampling strategy, likely a ParentBasedSampler
combined with a tail-based sampler to capture all traces associated with errors, regardless of the initial sampling decision. Furthermore, the manual context propagation within the Pulsar client code is a potential point of failure. A more robust approach would involve creating a middleware or wrapper for the Pulsar Go client that automatically handles trace injection and extraction, making the process transparent to the application developer. This would reduce boilerplate and the risk of implementation errors in new services.