The technical pain point emerged from a real-time analytics dashboard. We needed to stream user interaction data from a complex React frontend to a backend processing service with sub-second latency. Initial attempts using batched HTTP POST requests were failing under load; latency spikes and head-of-line blocking made the “real-time” aspect a misnomer. More importantly, we were flying blind. When a data point was delayed or lost, debugging the journey from a browser click to a backend database was a nightmare of correlating timestamps across disparate logging systems. We had a clear mandate: build a high-throughput, low-latency pipeline, but it had to be fully observable from end to end.
Our initial concept was to create a fire-and-forget data stream. The frontend would emit events, and a backend service would consume them asynchronously. This immediately ruled out synchronous request-response patterns. The core challenge became technology selection for three distinct domains: frontend state management, the messaging transport, and the observability fabric tying it all together.
For frontend state, we settled on Jotai. In a complex application, its atomic, bottom-up approach was a significant advantage over monolithic state stores. We could isolate the specific pieces of state that needed to be streamed without triggering cascading updates or complex selectors. The key feature for us was the ability to subscribe directly to an atom’s changes, making it a natural emission point for our data pipeline.
For the transport layer, we considered traditional message brokers like RabbitMQ or Kafka, but they felt like overkill. We didn’t need guaranteed delivery, persistent queues, or complex routing logic for this particular use case. The operational overhead and infrastructure cost were significant deterrents. This led us to ZeroMQ. It’s not a broker; it’s a high-performance messaging library. It allowed us to build a simple, brokerless PUB-SUB
topology. A lightweight gateway service could accept WebSocket connections from the browser and publish messages onto a ZeroMQ topic, completely decoupling the web layer from the backend consumers. The trade-off was clear: we sacrificed broker-managed reliability for raw performance and architectural simplicity, a pragmatic choice for non-critical analytics data.
The final, and most critical, piece was observability. How do you trace an event that starts as a Jotai state update in a browser, travels over a WebSocket, gets republished onto a ZeroMQ socket by a Node.js gateway, and is finally processed by a Python service? This is where OpenTelemetry became non-negotiable. It provided a unified standard for instrumentation across JavaScript (browser and Node.js) and Python. The real implementation challenge, however, was not just instrumenting each service in isolation but propagating the trace context across these wildly different transport boundaries.
The resulting architecture can be visualized as two parallel flows: the data flow and the trace flow.
sequenceDiagram participant ReactApp as React App (Jotai) participant Gateway as Node.js Gateway participant ZMQ as ZeroMQ Bus participant Consumer as Python Consumer participant OTelCollector as OTel Collector participant Jaeger ReactApp->>+Gateway: WebSocket: { data, traceparent } Note over ReactApp, Gateway: Data Flow Gateway->>+ZMQ: ZMQ PUB: { data, traceparent } ZMQ->>+Consumer: ZMQ SUB: { data, traceparent } Consumer->>-Consumer: Process Data Note over ReactApp, Jaeger: Trace Flow ReactApp->>OTelCollector: Export Span (Client) Gateway->>OTelCollector: Export Span (Gateway) Consumer->>OTelCollector: Export Span (Consumer) OTelCollector->>Jaeger: Export Traces
Frontend Instrumentation: Bridging Jotai and OpenTelemetry
The first step was to instrument the point of origin: the Jotai atom itself. A common mistake is to scatter instrumentation logic throughout the UI components. A much cleaner approach is to create a higher-order factory function that wraps atoms with tracing capabilities.
Here is atomWithTracing.ts
, a utility that takes a primitive atom and returns a new derived atom. The write function of this derived atom automatically starts an OpenTelemetry span.
// src/lib/atomWithTracing.ts
import { atom, WritableAtom } from 'jotai';
import { trace, context, propagation } from '@opentelemetry/api';
// This is a simplified WebSocket client for demonstration.
// A production implementation would handle reconnects, backpressure, etc.
import { webSocketClient } from './websocketClient';
const tracer = trace.getTracer('jotai-application-tracer');
type AtomUpdatePayload<T> = {
key: string;
value: T;
traceparent?: string;
};
/**
* A factory function that creates a Jotai atom with built-in OpenTelemetry tracing.
* On every write operation, it starts a new span, injects the trace context
* into the payload, and sends it over a WebSocket.
*
* @param key A unique identifier for this piece of state.
* @param initialValue The initial value of the atom.
* @returns A Jotai WritableAtom.
*/
export function atomWithTracing<T>(key: string, initialValue: T): WritableAtom<T, [T], void> {
const baseAtom = atom(initialValue);
const derivedAtom = atom(
(get) => get(baseAtom),
(get, set, update: T) => {
// 1. Start an OpenTelemetry Span. This represents the work of updating the state.
const span = tracer.startSpan(`Jotai State Update: ${key}`);
// 2. Execute the span's logic within an active context.
// This is crucial for context propagation.
context.with(trace.setSpan(context.active(), span), () => {
try {
// Update the local state
set(baseAtom, update);
span.setAttribute('atom.key', key);
span.setAttribute('atom.value', JSON.stringify(update));
// 3. Prepare the payload for the backend.
const payload: AtomUpdatePayload<T> = {
key,
value: update,
};
// 4. Inject the current trace context (traceparent header) into the payload.
// This is the magic that links the frontend span to the backend span.
const carrier = {};
propagation.inject(context.active(), carrier);
if (typeof (carrier as any).traceparent === 'string') {
payload.traceparent = (carrier as any).traceparent;
} else {
console.warn('Failed to inject traceparent into carrier.');
}
// 5. Send the payload over the WebSocket.
webSocketClient.send(JSON.stringify(payload));
span.addEvent('Payload sent to WebSocket gateway');
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: 2, message: (error as Error).message }); // 2 = ERROR
throw error;
} finally {
// 6. End the span.
span.end();
}
});
}
);
return derivedAtom;
}
This utility is then used to define state atoms within the application. Any component using useSetAtom
on userPreferencesAtom
will now automatically trigger a traced event.
// src/state/atoms.ts
import { atomWithTracing } from '../lib/atomWithTracing';
interface UserPreferences {
theme: 'dark' | 'light';
notifications: boolean;
}
export const userPreferencesAtom = atomWithTracing<UserPreferences>('userPreferences', {
theme: 'light',
notifications: true,
});
Setting up the OpenTelemetry provider in the main application entry point is the final piece of the frontend puzzle. This configures how and where traces are exported.
// src/instrumentation.ts
import { WebTracerProvider } from '@opentelemetry/sdk-trace-web';
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { getWebAutoInstrumentations } from '@opentelemetry/auto-instrumentations-web';
const provider = new WebTracerProvider();
// For production, you'd use a BatchSpanProcessor.
// SimpleSpanProcessor is used here for immediate feedback during development.
provider.addSpanProcessor(new SimpleSpanProcessor(
new OTLPTraceExporter({
// URL of the OTel Collector
url: 'http://localhost:4318/v1/traces',
})
));
provider.register();
// Auto-instrument common web APIs like fetch, XHR, etc.
registerInstrumentations({
instrumentations: [getWebAutoInstrumentations()],
});
console.log('OpenTelemetry Web Tracer Provider registered.');
The Gateway: Translating WebSocket to ZeroMQ
The gateway is a critical piece of infrastructure. It’s a simple Node.js service that terminates WebSocket connections from thousands of clients and multiplexes their messages onto a single, high-speed ZeroMQ PUB
socket. Its secondary, but equally important, role is to continue the distributed trace.
The core logic involves:
- Setting up a WebSocket server.
- Configuring a ZeroMQ
PUB
socket. - Upon receiving a message, extracting the
traceparent
from the payload. - Using the
traceparent
to create a new span that is a child of the original browser span. - Republishing the payload on the ZeroMQ topic.
Here’s the annotated implementation of the gateway server:
// gateway/server.js
const { WebSocketServer } = require('ws');
const zmq = require('zeromq');
const { trace, context, propagation } = require('@opentelemetry/api');
// Initialize OpenTelemetry SDK for Node.js
// This should be in a separate file (e.g., tracing.js) and required at the very top.
const { tracer } = require('./tracing'); // Assume this file configures the Node tracer
const ZMQ_PUB_ADDRESS = 'tcp://*:5555';
const WEBSOCKET_PORT = 8080;
async function main() {
// 1. Configure the ZeroMQ publisher socket
const pubSocket = new zmq.Publisher();
await pubSocket.bind(ZMQ_PUB_ADDRESS);
console.log(`ZeroMQ publisher bound to ${ZMQ_PUB_ADDRESS}`);
// 2. Configure the WebSocket server
const wss = new WebSocketServer({ port: WEBSOCKET_PORT });
console.log(`WebSocket server listening on port ${WEBSOCKET_PORT}`);
wss.on('connection', ws => {
console.log('Client connected');
ws.on('message', (message) => {
let data;
try {
data = JSON.parse(message.toString());
} catch (e) {
console.error('Failed to parse incoming message:', message.toString());
return;
}
// 3. Extract the trace context from the incoming payload
const parentContext = propagation.extract(context.active(), { traceparent: data.traceparent });
const span = tracer.startSpan('gateway.process-message', {
kind: 1, // SPAN_KIND_SERVER
}, parentContext);
// 4. Run the processing logic within the new span's context
context.with(trace.setSpan(context.active(), span), async () => {
try {
span.setAttribute('messaging.system', 'zeromq');
span.setAttribute('messaging.destination', 'atom-updates');
span.setAttribute('atom.key', data.key);
console.log(`Received and processing update for: ${data.key}`);
// For simplicity, we forward the entire payload including the traceparent.
// A production system might strip it out or use a different wire format.
const forwardMessage = JSON.stringify(data);
// 5. Publish the message on the ZeroMQ topic
await pubSocket.send(['atom-updates', forwardMessage]);
span.addEvent('Message published to ZeroMQ');
span.setStatus({ code: 0 }); // 0 = OK
} catch (error) {
console.error('Error processing message in gateway:', error);
span.recordException(error);
span.setStatus({ code: 2, message: error.message }); // 2 = ERROR
} finally {
span.end();
}
});
});
ws.on('close', () => console.log('Client disconnected'));
});
process.on('SIGINT', async () => {
console.log('Shutting down...');
await pubSocket.close();
wss.close();
process.exit(0);
});
}
main().catch(console.error);
The corresponding tracing.js
for Node.js is similar in concept to the frontend one, but uses the Node SDK.
// gateway/tracing.js
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc');
const { SimpleSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { HttpInstrumentation } = require('@opentelemetry/instrumentation-http');
const { WebSocketInstrumentation } = require('opentelemetry-instrumentation-ws');
const provider = new NodeTracerProvider({
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'websocket-zeromq-gateway',
}),
});
// Configure the gRPC exporter to send traces to the OTel Collector
const exporter = new OTLPTraceExporter({
url: 'http://localhost:4317', // gRPC endpoint
});
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
provider.register();
registerInstrumentations({
tracerProvider: provider,
instrumentations: [
new HttpInstrumentation(),
new WebSocketInstrumentation(),
// Note: There is no standard OTel instrumentation for ZeroMQ.
// Our manual instrumentation in server.js fills this gap.
],
});
module.exports.tracer = provider.getTracer('gateway-tracer');
The Python Consumer: Closing the Loop
The final component is the Python consumer. It subscribes to the atom-updates
topic on the ZeroMQ bus, receives messages, and performs some action. Crucially, it must also participate in the distributed trace.
The logic mirrors the gateway:
- Configure a ZeroMQ
SUB
socket. - In a loop, receive messages.
- Parse the message and extract the
traceparent
. - Create a final child span linked to the gateway’s span.
- Do the work, then end the span.
# consumer/main.py
import zmq
import json
import time
import os
from opentelemetry import trace, propagate
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPTraceExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
# 1. Configure OpenTelemetry
resource = Resource(attributes={
SERVICE_NAME: "python-zeromq-consumer"
})
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPTraceExporter(endpoint="localhost:4317", insecure=True)
provider.add_span_processor(SimpleSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
def main():
ZMQ_SUB_ADDRESS = os.environ.get("ZMQ_SUB_ADDRESS", "tcp://localhost:5555")
context = zmq.Context()
# 2. Configure ZeroMQ subscriber
socket = context.socket(zmq.SUB)
socket.connect(ZMQ_SUB_ADDRESS)
socket.setsockopt_string(zmq.SUBSCRIBE, "atom-updates")
print(f"ZeroMQ subscriber connected to {ZMQ_SUB_ADDRESS}")
while True:
try:
# Receive a multipart message [topic, payload]
[topic, message_raw] = socket.recv_multipart()
data = json.loads(message_raw.decode('utf-8'))
# 3. Extract trace context from the payload
carrier = {'traceparent': data.get('traceparent')}
parent_context = propagate.extract(carrier)
# 4. Start a new span as a child of the gateway span
with tracer.start_as_current_span(
"consumer.process-message",
context=parent_context,
kind=trace.SpanKind.CONSUMER
) as span:
span.set_attribute("messaging.system", "zeromq")
span.set_attribute("messaging.destination", topic.decode('utf-8'))
span.set_attribute("atom.key", data.get('key'))
print(f"Processing message for key: {data.get('key')}")
# Simulate some processing work
time.sleep(0.05)
span.add_event("Finished processing atom update.")
except KeyboardInterrupt:
print("Shutting down...")
break
except Exception as e:
print(f"An error occurred: {e}")
# In a real app, you would record the exception on the span
# if a span was active.
time.sleep(1)
socket.close()
context.term()
if __name__ == "__main__":
main()
Orchestration and Verification
To run this entire stack, a docker-compose.yml
is essential. It defines the OTel collector, a tracing backend like Jaeger, and our custom services.
# docker-compose.yml
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:1.35
ports:
- "16686:16686" # Jaeger UI
- "14268:14268" # Jaeger Collector HTTP
otel-collector:
image: otel/opentelemetry-collector-contrib:0.62.1
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
depends_on:
- jaeger
gateway:
build: ./gateway
ports:
- "8080:8080"
- "5555:5555"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
depends_on:
- otel-collector
consumer:
build: ./consumer
environment:
- ZMQ_SUB_ADDRESS=tcp://gateway:5555
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
depends_on:
- gateway
The OpenTelemetry collector configuration is the glue. It receives traces from all three sources (React App, Gateway, Consumer) and exports them to a single backend (Jaeger).
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
http:
processors:
batch:
exporters:
jaeger:
endpoint: jaeger:14250
tls:
insecure: true
logging:
loglevel: debug
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, jaeger]
After running docker-compose up
and interacting with the React application, navigating to the Jaeger UI at http://localhost:16686
reveals the result. A single search yields a unified trace waterfall. The root span, Jotai State Update
, originates from the browser. Nested beneath it is the gateway.process-message
span, and finally, the consumer.process-message
span. The entire lifecycle of a single state change, across process and language boundaries, is captured in one coherent view. This is the payoff. Debugging is no longer a matter of guesswork; it’s a matter of reading a trace.
The primary limitation of this specific architecture is its “at-most-once” delivery guarantee inherited from the ZeroMQ PUB-SUB
pattern. If the consumer is down when a message is published, that message is lost forever. This was an acceptable trade-off for our analytics use case, but it would be unsuitable for critical transactional data. Future iterations could explore more resilient ZeroMQ patterns like PUSH-PULL
for load balancing among consumers or a REQ-REP
flow if acknowledgments are needed, though each would introduce complexity to the tracing context propagation. Furthermore, the gateway is a single point of failure; scaling it would require a load balancer and a more sophisticated ZeroMQ topology (e.g., using the XPUB/XSUB
proxy) to correctly distribute messages from multiple publishers.