Implementing End-to-End Distributed Tracing Across Dart, APISIX, Echo, and Pulsar


The system’s behavior was becoming unpredictable. A user action, processed via an HTTP request through our APISIX gateway to an Echo backend, would trigger an asynchronous job by publishing a message to an Apache Pulsar topic. Days later, a support ticket would arrive reporting that the job never completed. The logs from the consumer service showed no record of the message, while the producer service logs confirmed it was sent. Between the gateway and the final database commit, the request’s context was lost in a sea of disconnected log entries and metric spikes. We were flying blind. The immediate need was not for more logging, but for a narrative thread connecting these disparate events. This led us to adopt OpenTelemetry for end-to-end distributed tracing. The real work was instrumenting our entire, heterogeneous stack: a Dart-based diagnostic tool, Apache APISIX, Go services using the Echo framework, and our Pulsar messaging backbone.

Our initial concept was to enforce a single trace context that would originate from the very first interaction, propagate through every component, and terminate only when the final piece of work was done. This meant the W3C traceparent header had to survive REST APIs, the APISIX proxy layer, and, most critically, the transition into and out of Pulsar’s message properties.

Technology selection was dictated by our existing stack, but instrumentation choices required careful consideration.

  • APISIX: Its opentelemetry plugin seemed like a perfect fit. It promised to handle trace creation and propagation at the edge without code changes in the gateway itself. The reality, as we found, is that its configuration needed to be precise to avoid breaking traces or creating orphaned ones.
  • Echo (Go): The OpenTelemetry Go SDK is mature. The go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho package provided a middleware that handled most of the HTTP server-side instrumentation. The real challenge was on the client side—specifically, the Pulsar producer client.
  • Pulsar: This was the heart of the problem. Unlike Kafka, which has well-established instrumentation libraries, the Go Pulsar client’s OpenTelemetry integration was less mature at the time. We concluded we’d have to manually inject and extract the trace context into and from the message properties. This was a critical implementation detail that would make or break the entire effort.
  • Dart: We used a Dart command-line tool for internal testing and diagnostics. To validate our tracing setup, this tool needed to be the originator of the trace. We had to find a viable OpenTelemetry SDK for Dart and ensure its HTTP client could correctly inject the traceparent header.
  • GitLab CI/CD: We decided early on that manual verification of traces in a UI like Jaeger was insufficient. A real-world project requires automated validation. Our CI/CD pipeline had to become an orchestration engine that not only deployed the services but also ran an integration test that programmatically verified the integrity of the traces.

The implementation journey was a series of connected steps, each building upon the last to form a complete, observable data path.

Part 1: Configuring APISIX for Trace Propagation

The first step was ensuring APISIX didn’t just pass headers but actively participated in the trace. We configured it to create a span for the request it handled, linking it to the parent span from the Dart client and creating a new child span context for the upstream Echo service.

We defined a route that pointed to our producer service and attached the opentelemetry plugin. The key was configuring the sampler. For our diagnostic environment, we used always_on, but in production, this would be a probabilistic sampler. The additional_attributes allowed us to tag spans originating from the gateway.

Here is the APISIX route configuration, managed declaratively in our Git repository.

# apisix/conf/routes.yaml
routes:
  - id: "1"
    uri: "/v1/process"
    methods: ["POST"]
    upstream:
      type: "roundrobin"
      nodes:
        "echo-producer:8080": 1
    plugins:
      opentelemetry:
        sampler:
          name: always_on
        endpoint: "http://jaeger:4318/v1/traces" # OTLP HTTP endpoint
        protocol: "http"
        additional_attributes:
          - "service.instance.id=apisix-ingress-1"
          - "peer.service=echo-producer"

A common mistake here is misconfiguring the OTLP endpoint or protocol, leading to silent failures where traces are simply dropped. We added specific health checks in our startup scripts to ensure APISIX could reach the Jaeger collector before accepting traffic.

Part 2: Instrumenting the Echo Producer Service

This Go service listens for HTTP requests from APISIX and publishes a message to Pulsar. Instrumentation involved three main tasks: setting up the global OpenTelemetry provider, adding the otelecho middleware, and manually injecting the trace context into the Pulsar message.

Here’s the core of the producer’s setup.

// cmd/producer/main.go
package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
)

const (
	serviceName    = "echo-producer"
	pulsarURL      = "pulsar://pulsar:6650"
	pulsarTopic    = "persistent://public/default/job-topic"
	jaegerEndpoint = "jaeger:4318"
)

// newTracerProvider creates a new trace provider instance and registers it globally.
func newTracerProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
	// Using OTLP HTTP exporter
	exporter, err := otlptracehttp.New(ctx, otlptracehttp.WithEndpoint(jaegerEndpoint), otlptracehttp.WithInsecure())
	if err != nil {
		return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
	}

	res, err := resource.New(ctx,
		resource.WithAttributes(
			semconv.ServiceNameKey.String(serviceName),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create resource: %w", err)
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(res),
		sdktrace.WithSampler(sdktrace.AlwaysSample()), // Use ParentBasedSampler in production
	)
	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

	return tp, nil
}

type PulsarMessageCarrier map[string]string

func (c PulsarMessageCarrier) Get(key string) string {
	return c[key]
}

func (c PulsarMessageCarrier) Set(key string, value string) {
	c[key] = value
}

func (c PulsarMessageCarrier) Keys() []string {
	keys := make([]string, 0, len(c))
	for k := range c {
		keys = append(keys, k)
	}
	return keys
}


func main() {
	ctx := context.Background()

	// Setup OpenTelemetry
	tp, err := newTracerProvider(ctx)
	if err != nil {
		log.Fatalf("failed to initialize tracer provider: %v", err)
	}
	defer func() {
		if err := tp.Shutdown(ctx); err != nil {
			log.Printf("error shutting down tracer provider: %v", err)
		}
	}()

	// Pulsar Client
	pulsarClient, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               pulsarURL,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		log.Fatalf("could not instantiate Pulsar client: %v", err)
	}
	defer pulsarClient.Close()

	producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
		Topic: pulsarTopic,
	})
	if err != nil {
		log.Fatalf("could not create Pulsar producer: %v", err)
	}
	defer producer.Close()

	// Echo Server
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(otelecho.Middleware(serviceName)) // The crucial middleware

	// The handler where the magic happens
	e.POST("/v1/process", func(c echo.Context) error {
		// The otelecho middleware automatically extracts the span from the request context.
		// We get the current span for logging and to create child spans.
		span := trace.SpanFromContext(c.Request().Context())
		defer span.End()

		// 1. Log with trace and span IDs
		sc := span.SpanContext()
		log.Printf("Producer received request. TraceID: %s, SpanID: %s", sc.TraceID().String(), sc.SpanID().String())

		// 2. Business Logic
		var payload map[string]interface{}
		if err := c.Bind(&payload); err != nil {
			span.RecordError(err)
			span.SetStatus(500, "Invalid payload")
			return c.JSON(http.StatusBadRequest, map[string]string{"error": "invalid json payload"})
		}
		span.SetAttributes(attribute.String("payload.id", fmt.Sprintf("%v", payload["id"])))

		// 3. Prepare to send to Pulsar. This is the critical part.
		// We create a new span for the Pulsar publish operation.
		tracer := otel.Tracer("pulsar-producer-tracer")
		producerCtx, producerSpan := tracer.Start(c.Request().Context(), "PulsarProduce")
		defer producerSpan.End()

		producerSpan.SetAttributes(
			semconv.MessagingSystemKey.String("pulsar"),
			semconv.MessagingDestinationNameKey.String(pulsarTopic),
		)

		// 4. Manual context injection into message properties
		// Create a carrier and inject the current context into it.
		carrier := make(PulsarMessageCarrier)
		propagator := otel.GetTextMapPropagator()
		propagator.Inject(producerCtx, carrier)

		// The properties of the message will now contain 'traceparent' and 'tracestate'.
		msg := &pulsar.ProducerMessage{
			Payload:    []byte(fmt.Sprintf("data for %v", payload["id"])),
			Properties: carrier,
		}

		// 5. Send the message
		msgID, err := producer.Send(producerCtx, msg)
		if err != nil {
			producerSpan.RecordError(err)
			producerSpan.SetStatus(500, "Failed to send to Pulsar")
			log.Printf("Failed to publish message: %v", err)
			return c.JSON(http.StatusInternalServerError, map[string]string{"error": "could not process message"})
		}

		producerSpan.SetAttributes(attribute.String("messaging.message_id", msgID.String()))
		log.Printf("Message sent to Pulsar with ID %s. TraceID: %s, SpanID: %s", msgID.String(), sc.TraceID().String(), producerSpan.SpanContext().SpanID().String())
		
		return c.JSON(http.StatusOK, map[string]string{"status": "processing", "messageId": msgID.String()})
	})

	if err := e.Start(":8080"); err != nil && err != http.ErrServerClosed {
		e.Logger.Fatal(err)
	}
}

The PulsarMessageCarrier type is essential. It implements the propagation.TextMapCarrier interface, providing a generic way for the OpenTelemetry SDK to inject key-value pairs (traceparent, tracestate) into a simple map[string]string, which we then assign to the Properties field of the Pulsar message.

Part 3: Instrumenting the Pulsar Consumer Service

The consumer service is a background worker that listens for messages from the topic. Its main task is to extract the trace context from the message properties and start a new span that is correctly parented to the producer’s span. This visually stitches the asynchronous part of the workflow to the initial HTTP request in our tracing backend.

// cmd/consumer/main.go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

const (
	serviceName    = "pulsar-consumer"
	pulsarURL      = "pulsar://pulsar:6650"
	pulsarTopic    = "persistent://public/default/job-topic"
	subscription   = "my-subscription"
	jaegerEndpoint = "jaeger:4318"
)

// newTracerProvider is identical to the producer's setup
func newTracerProvider(ctx context.Context) (*sdktrace.TracerProvider, error) {
	exporter, err := otlptracehttp.New(ctx, otlptracehttp.WithEndpoint(jaegerEndpoint), otlptracehttp.WithInsecure())
	if err != nil {
		return nil, fmt.Errorf("failed to create OTLP exporter: %w", err)
	}

	res, err := resource.New(ctx,
		resource.WithAttributes(
			semconv.ServiceNameKey.String(serviceName),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create resource: %w", err)
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(res),
		sdktrace.WithSampler(sdktrace.AlwaysSample()),
	)
	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

	return tp, nil
}

// Re-using the same carrier implementation
type PulsarMessageCarrier map[string]string

func (c PulsarMessageCarrier) Get(key string) string {
	return c[key]
}

func (c PulsarMessageCarrier) Set(key string, value string) {
	c[key] = value
}

func (c PulsarMessageCarrier) Keys() []string {
	keys := make([]string, 0, len(c))
	for k := range c {
		keys = append(keys, k)
	}
	return keys
}

func main() {
	ctx := context.Background()

	tp, err := newTracerProvider(ctx)
	if err != nil {
		log.Fatalf("failed to initialize tracer provider: %v", err)
	}
	defer func() {
		if err := tp.Shutdown(ctx); err != nil {
			log.Printf("error shutting down tracer provider: %v", err)
		}
	}()

	pulsarClient, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarURL})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}
	defer pulsarClient.Close()

	consumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
		Topic:            pulsarTopic,
		SubscriptionName: subscription,
		Type:             pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	tracer := otel.Tracer("pulsar-consumer-tracer")
	propagator := otel.GetTextMapPropagator()

	log.Println("Consumer started. Waiting for messages...")
	for {
		msg, err := consumer.Receive(ctx)
		if err != nil {
			log.Fatalf("Consumer receive error: %v", err)
		}

		// 1. Extract context from message properties
		carrier := PulsarMessageCarrier(msg.Properties())
		parentCtx := propagator.Extract(context.Background(), carrier)

		// 2. Start a new span as a child of the producer's span
		consumerCtx, span := tracer.Start(parentCtx, "PulsarConsume")
		span.SetAttributes(
			semconv.MessagingSystemKey.String("pulsar"),
			semconv.MessagingDestinationNameKey.String(pulsarTopic),
			attribute.String("messaging.message_id", msg.ID().String()),
		)
		
		sc := span.SpanContext()
		log.Printf("Consumer received message. TraceID: %s, SpanID: %s", sc.TraceID().String(), sc.SpanID().String())

		// 3. Process the message
		log.Printf("Processing payload: %s", string(msg.Payload()))
		// Simulate work
		time.Sleep(100 * time.Millisecond)

		// 4. Acknowledge and finish the span
		consumer.Ack(msg)
		span.End()
		log.Printf("Finished processing message. TraceID: %s", sc.TraceID().String())
	}
}

Part 4: The Dart Diagnostic Client

To initiate the entire flow, we built a simple Dart CLI tool. Its responsibility is to create the root span of the trace and inject the traceparent header into the initial HTTP POST request to APISIX.

// bin/run_test.dart
import 'dart:convert';
import 'dart:io';
import 'package:http/http.dart' as http;
import 'package:opentelemetry/api.dart' as api;
import 'package:opentelemetry/sdk.dart' as sdk;
import 'package:opentelemetry/exporter_otlp_http.dart' as otlp;
import 'package:opentelemetry/contrib_resource_detectors_process.dart' as resource_detectors;

// A simple carrier for injecting headers into the http request.
class HttpCarrier implements api.TextMapSetter<http.Request> {
  
  void set(http.Request carrier, String key, String value) {
    carrier.headers[key] = value;
  }
}

Future<void> main(List<String> arguments) async {
  // 1. Configure OpenTelemetry SDK
  final resource = sdk.Resource([
    ...sdk.Resource.defaultResources,
    ...resource_detectors.processResource,
    api.Attribute.fromString(api.ResourceAttributes.serviceName, 'dart-diagnostic-client'),
  ]);

  final exporter = otlp.CollectorExporter(
    Uri.parse('http://localhost:4318/v1/traces'), // Assumes running locally
  );
  final processor = sdk.BatchSpanProcessor(exporter);
  final tracerProvider = sdk.TracerProvider(
    resource: resource,
    processors: [processor],
  );
  final tracer = tracerProvider.getTracer('my-app-tracer');

  // 2. Create the root span for our operation
  final span = tracer.startSpan('initiate-processing-job');

  try {
    // 3. Create the HTTP request and inject the trace context
    final url = Uri.parse('http://localhost:9080/v1/process');
    final request = http.Request('POST', url)
      ..headers[HttpHeaders.contentTypeHeader] = 'application/json'
      ..body = json.encode({'id': 'job-123', 'data': 'some-payload'});

    final propagator = api.W3CTraceContextPropagator();
    propagator.inject(api.Context.current.withSpan(span), request, HttpCarrier());
    
    final traceparent = request.headers['traceparent'];
    print('Sending request with traceparent header: $traceparent');
    span.setAttribute(api.Attribute.fromString('http.url', url.toString()));
    
    // 4. Send the request
    final client = http.Client();
    final response = await client.send(request);

    final responseBody = await response.stream.bytesToString();
    span.setAttribute(api.Attribute.fromInt('http.status_code', response.statusCode));
    print('Response status: ${response.statusCode}');
    print('Response body: ${responseBody}');

  } catch (e, s) {
    span.recordException(e, stackTrace: s);
    span.setStatus(api.StatusCode.error, description: 'Request failed');
    print('Error: $e');
  } finally {
    span.end();
    // 5. Shutdown the provider to ensure all spans are exported
    await tracerProvider.shutdown();
    print('Trace sent.');
  }
}

Part 5: The GitLab CI/CD Verification Pipeline

This is where everything comes together. The pipeline uses Docker Compose to set up a self-contained environment with Jaeger, Pulsar, APISIX, and our Go services. It then runs the Dart client to generate traffic and, finally, a script queries the Jaeger API to verify that a complete trace was generated.

A complete trace is defined as one that contains spans from dart-diagnostic-client, apisix, echo-producer, and pulsar-consumer, all sharing the same trace ID.

# .gitlab-ci.yml
stages:
  - setup
  - test
  - verify
  - teardown

variables:
  # Using docker-in-docker for service management
  DOCKER_HOST: tcp://docker:2375
  DOCKER_TLS_CERTDIR: ""

# Service needed for Docker commands
default:
  services:
    - name: docker:dind
      alias: docker

build_images:
  stage: setup
  script:
    - echo "Building service images..."
    - docker build -t my-echo-producer ./producer
    - docker build -t my-pulsar-consumer ./consumer
    - docker build -t my-dart-client ./dart-client
    # In a real project, these would be pushed to a registry

run_integration_test:
  stage: test
  script:
    - echo "Starting services with Docker Compose..."
    # This docker-compose.yml defines jaeger, pulsar, apisix, and our services
    - docker-compose up -d
    - echo "Waiting for services to be healthy..."
    - sleep 30 # Simple wait; a real pipeline would have proper health checks
    - echo "Running Dart client to generate trace..."
    - docker-compose run --rm dart-client
  artifacts:
    when: always
    paths:
      - docker-compose.logs
    expire_in: 1 day
  after_script:
    - docker-compose logs > docker-compose.logs

verify_trace:
  stage: verify
  needs: ["run_integration_test"]
  script:
    - apk add --no-cache curl jq
    - echo "Querying Jaeger for the trace..."
    # Give Jaeger a moment to process the trace
    - sleep 10
    - >
      JAEGER_API="http://docker:16686/api/traces"
      SERVICE_NAME="dart-diagnostic-client"
      LOOKBACK="5m"
      
      # 1. Get the Trace ID from the root service
      TRACE_ID=$(curl -s "$JAEGER_API?service=$SERVICE_NAME&lookback=$LOOKBACK&limit=1" | jq -r '.data[0].traceID')

      - |
        if [ -z "$TRACE_ID" ] || [ "$TRACE_ID" == "null" ]; then
          echo "Error: Could not find any trace for service $SERVICE_NAME"
          exit 1
        fi
      - echo "Found Trace ID: $TRACE_ID"

      # 2. Get all spans for that Trace ID
      - >
        SPAN_SERVICES=$(curl -s "$JAEGER_API/$TRACE_ID" | jq -r '.data[0].processes | .[] | .serviceName')
      
      # 3. Assert that all expected services are present in the trace
      - |
        echo "Services found in trace:"
        echo "$SPAN_SERVICES"
        
        echo "$SPAN_SERVICES" | grep -q "dart-diagnostic-client" || (echo "Missing span from dart-diagnostic-client" && exit 1)
        echo "$SPAN_SERVICES" | grep -q "apisix" || (echo "Missing span from apisix" && exit 1)
        echo "$SPAN_SERVICES" | grep -q "echo-producer" || (echo "Missing span from echo-producer" && exit 1)
        echo "$SPAN_SERVICES" | grep -q "pulsar-consumer" || (echo "Missing span from pulsar-consumer" && exit 1)

      - echo "Trace verification successful: All services are present."

cleanup:
  stage: teardown
  script:
    - echo "Cleaning up Docker Compose environment..."
    - docker-compose down -v
  when: always

This CI pipeline transformed observability from a passive debugging tool into a first-class, testable feature of our system. If a code change broke context propagation, the verify_trace job would fail, preventing the regression from ever reaching production.

sequenceDiagram
    participant Dart Client
    participant GitLab CI
    participant APISIX
    participant Echo Producer
    participant Pulsar
    participant Echo Consumer
    participant Jaeger

    GitLab CI->>Dart Client: Run test command
    Dart Client->>Jaeger: Start Trace (Span A)
    Dart Client->>APISIX: POST /v1/process (with traceparent)
    APISIX->>Jaeger: Create Span B (child of A)
    APISIX->>Echo Producer: Forward request (with new traceparent)
    Echo Producer->>Jaeger: Create Span C (child of B)
    Echo Producer->>Pulsar: Publish Message (with trace context in properties)
    Pulsar-->>Echo Consumer: Deliver Message
    Echo Consumer->>Jaeger: Create Span D (child of C, from message properties)
    Echo Consumer-->>Pulsar: Ack Message
    GitLab CI->>Jaeger: Query API for Trace
    Jaeger-->>GitLab CI: Return spans for trace
    GitLab CI->>GitLab CI: Assert Spans A, B, C, D exist

The solution, while effective, is not without its limitations. Our current implementation traces every single request, which is not feasible in a high-throughput production environment. The next logical step is to implement a sophisticated sampling strategy, likely a ParentBasedSampler combined with a tail-based sampler to capture all traces associated with errors, regardless of the initial sampling decision. Furthermore, the manual context propagation within the Pulsar client code is a potential point of failure. A more robust approach would involve creating a middleware or wrapper for the Pulsar Go client that automatically handles trace injection and extraction, making the process transparent to the application developer. This would reduce boilerplate and the risk of implementation errors in new services.


  TOC