Implementing End-to-End Distributed Tracing for a Jotai and ZeroMQ Real-Time Data Pipeline


The technical pain point emerged from a real-time analytics dashboard. We needed to stream user interaction data from a complex React frontend to a backend processing service with sub-second latency. Initial attempts using batched HTTP POST requests were failing under load; latency spikes and head-of-line blocking made the “real-time” aspect a misnomer. More importantly, we were flying blind. When a data point was delayed or lost, debugging the journey from a browser click to a backend database was a nightmare of correlating timestamps across disparate logging systems. We had a clear mandate: build a high-throughput, low-latency pipeline, but it had to be fully observable from end to end.

Our initial concept was to create a fire-and-forget data stream. The frontend would emit events, and a backend service would consume them asynchronously. This immediately ruled out synchronous request-response patterns. The core challenge became technology selection for three distinct domains: frontend state management, the messaging transport, and the observability fabric tying it all together.

For frontend state, we settled on Jotai. In a complex application, its atomic, bottom-up approach was a significant advantage over monolithic state stores. We could isolate the specific pieces of state that needed to be streamed without triggering cascading updates or complex selectors. The key feature for us was the ability to subscribe directly to an atom’s changes, making it a natural emission point for our data pipeline.

For the transport layer, we considered traditional message brokers like RabbitMQ or Kafka, but they felt like overkill. We didn’t need guaranteed delivery, persistent queues, or complex routing logic for this particular use case. The operational overhead and infrastructure cost were significant deterrents. This led us to ZeroMQ. It’s not a broker; it’s a high-performance messaging library. It allowed us to build a simple, brokerless PUB-SUB topology. A lightweight gateway service could accept WebSocket connections from the browser and publish messages onto a ZeroMQ topic, completely decoupling the web layer from the backend consumers. The trade-off was clear: we sacrificed broker-managed reliability for raw performance and architectural simplicity, a pragmatic choice for non-critical analytics data.

The final, and most critical, piece was observability. How do you trace an event that starts as a Jotai state update in a browser, travels over a WebSocket, gets republished onto a ZeroMQ socket by a Node.js gateway, and is finally processed by a Python service? This is where OpenTelemetry became non-negotiable. It provided a unified standard for instrumentation across JavaScript (browser and Node.js) and Python. The real implementation challenge, however, was not just instrumenting each service in isolation but propagating the trace context across these wildly different transport boundaries.

The resulting architecture can be visualized as two parallel flows: the data flow and the trace flow.

sequenceDiagram
    participant ReactApp as React App (Jotai)
    participant Gateway as Node.js Gateway
    participant ZMQ as ZeroMQ Bus
    participant Consumer as Python Consumer
    participant OTelCollector as OTel Collector
    participant Jaeger

    ReactApp->>+Gateway: WebSocket: { data, traceparent }
    Note over ReactApp, Gateway: Data Flow
    Gateway->>+ZMQ: ZMQ PUB: { data, traceparent }
    ZMQ->>+Consumer: ZMQ SUB: { data, traceparent }
    Consumer->>-Consumer: Process Data

    Note over ReactApp, Jaeger: Trace Flow
    ReactApp->>OTelCollector: Export Span (Client)
    Gateway->>OTelCollector: Export Span (Gateway)
    Consumer->>OTelCollector: Export Span (Consumer)
    OTelCollector->>Jaeger: Export Traces

Frontend Instrumentation: Bridging Jotai and OpenTelemetry

The first step was to instrument the point of origin: the Jotai atom itself. A common mistake is to scatter instrumentation logic throughout the UI components. A much cleaner approach is to create a higher-order factory function that wraps atoms with tracing capabilities.

Here is atomWithTracing.ts, a utility that takes a primitive atom and returns a new derived atom. The write function of this derived atom automatically starts an OpenTelemetry span.

// src/lib/atomWithTracing.ts

import { atom, WritableAtom } from 'jotai';
import { trace, context, propagation } from '@opentelemetry/api';

// This is a simplified WebSocket client for demonstration.
// A production implementation would handle reconnects, backpressure, etc.
import { webSocketClient } from './websocketClient';

const tracer = trace.getTracer('jotai-application-tracer');

type AtomUpdatePayload<T> = {
  key: string;
  value: T;
  traceparent?: string;
};

/**
 * A factory function that creates a Jotai atom with built-in OpenTelemetry tracing.
 * On every write operation, it starts a new span, injects the trace context
 * into the payload, and sends it over a WebSocket.
 *
 * @param key A unique identifier for this piece of state.
 * @param initialValue The initial value of the atom.
 * @returns A Jotai WritableAtom.
 */
export function atomWithTracing<T>(key: string, initialValue: T): WritableAtom<T, [T], void> {
  const baseAtom = atom(initialValue);

  const derivedAtom = atom(
    (get) => get(baseAtom),
    (get, set, update: T) => {
      // 1. Start an OpenTelemetry Span. This represents the work of updating the state.
      const span = tracer.startSpan(`Jotai State Update: ${key}`);
      
      // 2. Execute the span's logic within an active context.
      // This is crucial for context propagation.
      context.with(trace.setSpan(context.active(), span), () => {
        try {
          // Update the local state
          set(baseAtom, update);
          span.setAttribute('atom.key', key);
          span.setAttribute('atom.value', JSON.stringify(update));

          // 3. Prepare the payload for the backend.
          const payload: AtomUpdatePayload<T> = {
            key,
            value: update,
          };

          // 4. Inject the current trace context (traceparent header) into the payload.
          // This is the magic that links the frontend span to the backend span.
          const carrier = {};
          propagation.inject(context.active(), carrier);
          
          if (typeof (carrier as any).traceparent === 'string') {
            payload.traceparent = (carrier as any).traceparent;
          } else {
            console.warn('Failed to inject traceparent into carrier.');
          }

          // 5. Send the payload over the WebSocket.
          webSocketClient.send(JSON.stringify(payload));
          span.addEvent('Payload sent to WebSocket gateway');

        } catch (error) {
          span.recordException(error as Error);
          span.setStatus({ code: 2, message: (error as Error).message }); // 2 = ERROR
          throw error;
        } finally {
          // 6. End the span.
          span.end();
        }
      });
    }
  );

  return derivedAtom;
}

This utility is then used to define state atoms within the application. Any component using useSetAtom on userPreferencesAtom will now automatically trigger a traced event.

// src/state/atoms.ts
import { atomWithTracing } from '../lib/atomWithTracing';

interface UserPreferences {
  theme: 'dark' | 'light';
  notifications: boolean;
}

export const userPreferencesAtom = atomWithTracing<UserPreferences>('userPreferences', {
  theme: 'light',
  notifications: true,
});

Setting up the OpenTelemetry provider in the main application entry point is the final piece of the frontend puzzle. This configures how and where traces are exported.

// src/instrumentation.ts
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { getWebAutoInstrumentations } from '@opentelemetry/auto-instrumentations-web';

const provider = new WebTracerProvider();

// For production, you'd use a BatchSpanProcessor.
// SimpleSpanProcessor is used here for immediate feedback during development.
provider.addSpanProcessor(new SimpleSpanProcessor(
  new OTLPTraceExporter({
    // URL of the OTel Collector
    url: 'http://localhost:4318/v1/traces', 
  })
));

provider.register();

// Auto-instrument common web APIs like fetch, XHR, etc.
registerInstrumentations({
  instrumentations: [getWebAutoInstrumentations()],
});

console.log('OpenTelemetry Web Tracer Provider registered.');

The Gateway: Translating WebSocket to ZeroMQ

The gateway is a critical piece of infrastructure. It’s a simple Node.js service that terminates WebSocket connections from thousands of clients and multiplexes their messages onto a single, high-speed ZeroMQ PUB socket. Its secondary, but equally important, role is to continue the distributed trace.

The core logic involves:

  1. Setting up a WebSocket server.
  2. Configuring a ZeroMQ PUB socket.
  3. Upon receiving a message, extracting the traceparent from the payload.
  4. Using the traceparent to create a new span that is a child of the original browser span.
  5. Republishing the payload on the ZeroMQ topic.

Here’s the annotated implementation of the gateway server:

// gateway/server.js

const { WebSocketServer } = require('ws');
const zmq = require('zeromq');
const { trace, context, propagation } = require('@opentelemetry/api');

// Initialize OpenTelemetry SDK for Node.js
// This should be in a separate file (e.g., tracing.js) and required at the very top.
const { tracer } = require('./tracing'); // Assume this file configures the Node tracer

const ZMQ_PUB_ADDRESS = 'tcp://*:5555';
const WEBSOCKET_PORT = 8080;

async function main() {
  // 1. Configure the ZeroMQ publisher socket
  const pubSocket = new zmq.Publisher();
  await pubSocket.bind(ZMQ_PUB_ADDRESS);
  console.log(`ZeroMQ publisher bound to ${ZMQ_PUB_ADDRESS}`);

  // 2. Configure the WebSocket server
  const wss = new WebSocketServer({ port: WEBSOCKET_PORT });
  console.log(`WebSocket server listening on port ${WEBSOCKET_PORT}`);

  wss.on('connection', ws => {
    console.log('Client connected');
    ws.on('message', (message) => {
      let data;
      try {
        data = JSON.parse(message.toString());
      } catch (e) {
        console.error('Failed to parse incoming message:', message.toString());
        return;
      }
      
      // 3. Extract the trace context from the incoming payload
      const parentContext = propagation.extract(context.active(), { traceparent: data.traceparent });

      const span = tracer.startSpan('gateway.process-message', {
        kind: 1, // SPAN_KIND_SERVER
      }, parentContext);
      
      // 4. Run the processing logic within the new span's context
      context.with(trace.setSpan(context.active(), span), async () => {
        try {
          span.setAttribute('messaging.system', 'zeromq');
          span.setAttribute('messaging.destination', 'atom-updates');
          span.setAttribute('atom.key', data.key);
          
          console.log(`Received and processing update for: ${data.key}`);

          // For simplicity, we forward the entire payload including the traceparent.
          // A production system might strip it out or use a different wire format.
          const forwardMessage = JSON.stringify(data);

          // 5. Publish the message on the ZeroMQ topic
          await pubSocket.send(['atom-updates', forwardMessage]);

          span.addEvent('Message published to ZeroMQ');
          span.setStatus({ code: 0 }); // 0 = OK
        } catch (error) {
          console.error('Error processing message in gateway:', error);
          span.recordException(error);
          span.setStatus({ code: 2, message: error.message }); // 2 = ERROR
        } finally {
          span.end();
        }
      });
    });

    ws.on('close', () => console.log('Client disconnected'));
  });

  process.on('SIGINT', async () => {
    console.log('Shutting down...');
    await pubSocket.close();
    wss.close();
    process.exit(0);
  });
}

main().catch(console.error);

The corresponding tracing.js for Node.js is similar in concept to the frontend one, but uses the Node SDK.

// gateway/tracing.js
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc');
const { SimpleSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { HttpInstrumentation } = require('@opentelemetry/instrumentation-http');
const { WebSocketInstrumentation } = require('opentelemetry-instrumentation-ws');

const provider = new NodeTracerProvider({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'websocket-zeromq-gateway',
  }),
});

// Configure the gRPC exporter to send traces to the OTel Collector
const exporter = new OTLPTraceExporter({
  url: 'http://localhost:4317', // gRPC endpoint
});
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

provider.register();

registerInstrumentations({
  tracerProvider: provider,
  instrumentations: [
    new HttpInstrumentation(),
    new WebSocketInstrumentation(),
    // Note: There is no standard OTel instrumentation for ZeroMQ.
    // Our manual instrumentation in server.js fills this gap.
  ],
});

module.exports.tracer = provider.getTracer('gateway-tracer');

The Python Consumer: Closing the Loop

The final component is the Python consumer. It subscribes to the atom-updates topic on the ZeroMQ bus, receives messages, and performs some action. Crucially, it must also participate in the distributed trace.

The logic mirrors the gateway:

  1. Configure a ZeroMQ SUB socket.
  2. In a loop, receive messages.
  3. Parse the message and extract the traceparent.
  4. Create a final child span linked to the gateway’s span.
  5. Do the work, then end the span.
# consumer/main.py

import zmq
import json
import time
import os

from opentelemetry import trace, propagate
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPTraceExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME

# 1. Configure OpenTelemetry
resource = Resource(attributes={
    SERVICE_NAME: "python-zeromq-consumer"
})
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPTraceExporter(endpoint="localhost:4317", insecure=True)
provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)

def main():
    ZMQ_SUB_ADDRESS = os.environ.get("ZMQ_SUB_ADDRESS", "tcp://localhost:5555")
    context = zmq.Context()
    
    # 2. Configure ZeroMQ subscriber
    socket = context.socket(zmq.SUB)
    socket.connect(ZMQ_SUB_ADDRESS)
    socket.setsockopt_string(zmq.SUBSCRIBE, "atom-updates")
    print(f"ZeroMQ subscriber connected to {ZMQ_SUB_ADDRESS}")

    while True:
        try:
            # Receive a multipart message [topic, payload]
            [topic, message_raw] = socket.recv_multipart()
            
            data = json.loads(message_raw.decode('utf-8'))
            
            # 3. Extract trace context from the payload
            carrier = {'traceparent': data.get('traceparent')}
            parent_context = propagate.extract(carrier)
            
            # 4. Start a new span as a child of the gateway span
            with tracer.start_as_current_span(
                "consumer.process-message", 
                context=parent_context, 
                kind=trace.SpanKind.CONSUMER
            ) as span:
                span.set_attribute("messaging.system", "zeromq")
                span.set_attribute("messaging.destination", topic.decode('utf-8'))
                span.set_attribute("atom.key", data.get('key'))
                
                print(f"Processing message for key: {data.get('key')}")
                # Simulate some processing work
                time.sleep(0.05) 
                
                span.add_event("Finished processing atom update.")
                
        except KeyboardInterrupt:
            print("Shutting down...")
            break
        except Exception as e:
            print(f"An error occurred: {e}")
            # In a real app, you would record the exception on the span
            # if a span was active.
            time.sleep(1)

    socket.close()
    context.term()

if __name__ == "__main__":
    main()

Orchestration and Verification

To run this entire stack, a docker-compose.yml is essential. It defines the OTel collector, a tracing backend like Jaeger, and our custom services.

# docker-compose.yml
version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.35
    ports:
      - "16686:16686" # Jaeger UI
      - "14268:14268" # Jaeger Collector HTTP
  
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.62.1
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"   # OTLP gRPC receiver
      - "4318:4318"   # OTLP HTTP receiver
    depends_on:
      - jaeger

  gateway:
    build: ./gateway
    ports:
      - "8080:8080"
      - "5555:5555"
    environment:
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - otel-collector

  consumer:
    build: ./consumer
    environment:
      - ZMQ_SUB_ADDRESS=tcp://gateway:5555
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - gateway

The OpenTelemetry collector configuration is the glue. It receives traces from all three sources (React App, Gateway, Consumer) and exports them to a single backend (Jaeger).

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
      http:

processors:
  batch:

exporters:
  jaeger:
    endpoint: jaeger:14250
    tls:
      insecure: true
  logging:
    loglevel: debug

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [logging, jaeger]

After running docker-compose up and interacting with the React application, navigating to the Jaeger UI at http://localhost:16686 reveals the result. A single search yields a unified trace waterfall. The root span, Jotai State Update, originates from the browser. Nested beneath it is the gateway.process-message span, and finally, the consumer.process-message span. The entire lifecycle of a single state change, across process and language boundaries, is captured in one coherent view. This is the payoff. Debugging is no longer a matter of guesswork; it’s a matter of reading a trace.

The primary limitation of this specific architecture is its “at-most-once” delivery guarantee inherited from the ZeroMQ PUB-SUB pattern. If the consumer is down when a message is published, that message is lost forever. This was an acceptable trade-off for our analytics use case, but it would be unsuitable for critical transactional data. Future iterations could explore more resilient ZeroMQ patterns like PUSH-PULL for load balancing among consumers or a REQ-REP flow if acknowledgments are needed, though each would introduce complexity to the tracing context propagation. Furthermore, the gateway is a single point of failure; scaling it would require a load balancer and a more sophisticated ZeroMQ topology (e.g., using the XPUB/XSUB proxy) to correctly distribute messages from multiple publishers.


  TOC