Implementing an End-to-End Exactly-Once WebSocket Delivery System for Apache Spark Streaming Data


The initial system was straightforward, almost naive. A real-time operations dashboard, designed to give stakeholders a live view of critical business metrics. The backend architecture was a standard Apache Spark Structured Streaming job reading from a Kafka topic, performing time-windowed aggregations, and writing the results to another Kafka topic. A Node.js service consumed these results and pushed them over WebSockets to a React front-end. It worked, but only under ideal conditions. The first time a network partition occurred between a client and the server, or the Node.js service pod was restarted for a routine deployment, the illusion of reliability shattered. We were faced with either missing data points on our graphs or, worse, duplicated ones, which led to incorrect alerting and a loss of trust in the system.

The core of the problem was a misunderstanding of what “real-time” guarantees. At-least-once delivery, which Kafka and a simple WebSocket push provide, results in duplicates upon retries. At-most-once is even worse, leading to data loss. For our requirements, neither was acceptable. We needed to guarantee that every single aggregation result from our Spark job was processed and rendered by the client application exactly one time, regardless of network failures, client disconnections, or server restarts. This marked the beginning of a deep re-architecture, moving from a simple fire-and-forget pipeline to one with robust, verifiable exactly-once semantics.

Reframing the Spark Job for Idempotent Processing

The foundation for any exactly-once system is an idempotent data producer. The source of our data, the Spark job, had to be the ultimate source of truth. The common mistake is to assume Spark’s checkpointing alone solves this. Checkpointing ensures Spark doesn’t lose its own state or re-process source data incorrectly on failure, but it does nothing to help downstream consumers handle the redelivered outputs that occur during a recovery.

Our solution was to embed a unique, deterministic identifier into every single output record from Spark. We leveraged a combination of the window’s end time and a unique key from the data itself.

Here is the revised core logic of our Spark Structured Streaming job in Scala. The critical change is the addition of a processingId.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._

object RealtimeAggregator {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("RealtimeMetricsAggregator")
      .config("spark.sql.shuffle.partitions", "8") // Production config
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val kafkaBootstrapServers = "kafka-broker-1:9092,kafka-broker-2:9092"
    val inputTopic = "raw_metrics"
    val outputTopic = "aggregated_metrics"
    val checkpointLocation = "hdfs:///spark/checkpoints/realtime-aggregator"

    val schema = new StructType()
      .add("timestamp", TimestampType)
      .add("metricName", StringType)
      .add("value", DoubleType)
      .add("sourceId", StringType)

    val inputStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("subscribe", inputTopic)
      .load()
      .selectExpr("CAST(value AS STRING) as json")
      .select(from_json(col("json"), schema).as("data"))
      .select("data.*")

    // The core of the aggregation logic with a 1-minute tumbling window
    val aggregatedStream = inputStream
      .withWatermark("timestamp", "2 minutes")
      .groupBy(
        window(col("timestamp"), "1 minute"),
        col("metricName")
      )
      .agg(
        avg("value").as("avgValue"),
        count("value").as("eventCount")
      )
      .select(
        col("window.end").as("windowEnd"),
        col("metricName"),
        col("avgValue"),
        col("eventCount")
      )
      // CRITICAL: Create a deterministic processing ID for idempotency
      // This ID is unique for each window and metric combination.
      .withColumn("processingId",
        sha1(concat(col("windowEnd").cast(StringType), lit("-"), col("metricName")))
      )

    // Write the structured data to the output topic as JSON
    val query = aggregatedStream
      .select(to_json(struct("*")).as("value")) // Serialize to JSON for Kafka
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(Trigger.ProcessingTime("1 minute")) // Set processing trigger
      .start()

    query.awaitTermination()
  }
}

The key piece is withColumn("processingId", ...). This creates a SHA-1 hash from the window’s end timestamp and the metric’s name. This ID is now deterministic; for the same time window and metric, the ID will always be the same. If Spark fails and has to re-process a micro-batch, it will produce records with the exact same processingIds. This shifts the responsibility of handling duplicates from Spark to the downstream consumers, which are now equipped with a way to identify them.

Architecting an Idempotent WebSocket Server

With an idempotent producer, the next weak link was our Node.js WebSocket server. The original implementation simply forwarded any message it received from Kafka. The new implementation had to become stateful. It needed to track the processingId of every message it sent to each connected client to prevent sending duplicates.

The contract is as follows:

  1. The server receives a message from Kafka.
  2. It broadcasts this message, including its processingId, to all connected clients.
  3. The server maintains a short-term, per-client cache of recently sent processingIds.
  4. When a client disconnects and reconnects, it must present the processingId of the last message it successfully processed.
  5. The server must then “replay” any messages the client missed during the disconnection, drawn from a central buffer.

Here is the core implementation of the stateful WebSocket server using Node.js and the ws library. In a real-world project, the state management (message buffer and processed IDs) would be externalized to a system like Redis to allow the WebSocket server to be stateless and horizontally scalable. For clarity here, we’ll use in-memory stores.

import { WebSocketServer, WebSocket } from 'ws';
import { Kafka } from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';
import winston from 'winston';

// Production-grade logging setup
const logger = winston.createLogger({
  level: 'info',
  format: winston.format.json(),
  transports: [
    new winston.transports.Console({
      format: winston.format.simple(),
    }),
  ],
});

const KAFKA_BROKERS = ['kafka-broker-1:9092'];
const KAFKA_TOPIC = 'aggregated_metrics';
const PORT = 8080;

// In-memory buffer for message replay. In production, use Redis Streams or similar.
const MESSAGE_BUFFER_SIZE = 200;
const messageBuffer = []; // Stores { processingId, payload, timestamp }

// State for each connected client
// Map<WebSocket, { lastAckId: string | null, clientUuid: string }>
const clientState = new Map();

// --- Kafka Consumer Setup ---
const kafka = new Kafka({
  clientId: 'websocket-gateway',
  brokers: KAFKA_BROKERS,
});
const consumer = kafka.consumer({ groupId: 'websocket-gateway-group' });

const runConsumer = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: KAFKA_TOPIC, fromBeginning: false });
  logger.info('Kafka consumer connected and subscribed.');

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      try {
        const data = JSON.parse(message.value.toString());
        const { processingId } = data;

        if (!processingId) {
          logger.warn('Received message without processingId, skipping.');
          return;
        }

        // Add to replay buffer and trim old messages
        messageBuffer.push({
          processingId,
          payload: data,
          timestamp: Date.now(),
        });
        if (messageBuffer.length > MESSAGE_BUFFER_SIZE) {
          messageBuffer.shift();
        }

        // Broadcast to all connected clients
        wss.clients.forEach((client) => {
          if (client.readyState === WebSocket.OPEN) {
            client.send(JSON.stringify(data));
          }
        });
      } catch (err) {
        logger.error(`Failed to process Kafka message: ${err.message}`);
      }
    },
  });
};

// --- WebSocket Server Setup ---
const wss = new WebSocketServer({ port: PORT });

wss.on('connection', (ws, req) => {
  const clientUuid = uuidv4();
  clientState.set(ws, { lastAckId: null, clientUuid });
  logger.info(`Client ${clientUuid} connected.`);

  ws.on('message', (message) => {
    try {
      const parsedMessage = JSON.parse(message.toString());

      // Client sends a message on reconnect to get missed data
      if (parsedMessage.type === 'RECONNECT' && parsedMessage.lastSeenId) {
        const { lastSeenId } = parsedMessage;
        logger.info(`Client ${clientUuid} reconnected, last seen ID: ${lastSeenId}`);
        clientState.get(ws).lastAckId = lastSeenId;
        
        // Find where in the buffer the client left off
        const lastSeenIndex = messageBuffer.findIndex(
          (msg) => msg.processingId === lastSeenId
        );

        if (lastSeenIndex !== -1) {
          const messagesToReplay = messageBuffer.slice(lastSeenIndex + 1);
          if (messagesToReplay.length > 0) {
            logger.info(`Replaying ${messagesToReplay.length} messages to client ${clientUuid}.`);
            messagesToReplay.forEach((msg) => {
              ws.send(JSON.stringify(msg.payload));
            });
          }
        } else {
            // If the last seen ID is too old and not in the buffer, we can't replay.
            // The client would need to fetch a full state snapshot via a separate REST API.
            logger.warn(`Client ${clientUuid} requested replay from ID not in buffer. Full sync needed.`);
            ws.send(JSON.stringify({type: 'ERROR', code: 'SYNC_REQUIRED'}));
        }
      }
      
      // A simple ACK mechanism can be implemented here if needed,
      // but for this model, we rely on the client's reconnect request.

    } catch (err) {
      logger.error(`Error processing message from client ${clientUuid}: ${err.message}`);
    }
  });

  ws.on('close', () => {
    const state = clientState.get(ws);
    logger.info(`Client ${state ? state.clientUuid : 'unknown'} disconnected.`);
    clientState.delete(ws);
  });

  ws.on('error', (err) => {
    const state = clientState.get(ws);
    logger.error(`WebSocket error for client ${state ? state.clientUuid : 'unknown'}: ${err.message}`);
  });
});

logger.info(`WebSocket server started on port ${PORT}`);
runConsumer().catch((err) =>
  logger.error(`Kafka consumer error: ${err.message}`, { stack: err.stack })
);

Client-Side Resilience and Headless UI Integration

The client application must also uphold its end of the contract. It cannot simply render every message it receives. It must perform its own duplicate detection and manage reconnection logic. This is where using a Headless UI component library like Radix or Headless UI by Tailwind Labs became a strategic advantage. A common pitfall with feature-rich charting libraries is that they have their own opaque internal state management, making it difficult to inject custom logic for idempotency. With headless components, we own the state and the rendering logic completely.

We built our dashboard components around a custom React hook that managed the WebSocket connection and message deduplication.

// hooks/useIdempotentSocket.ts
import { useState, useEffect, useRef, useCallback } from 'react';

const WEBSOCKET_URL = 'ws://localhost:8080';
const RECONNECT_INTERVAL_MS = 5000;

// This would be persisted to sessionStorage to survive page reloads.
let lastProcessedId: string | null = sessionStorage.getItem('lastProcessedId');

export interface MetricData {
  windowEnd: string;
  metricName: string;
  avgValue: number;
  eventCount: number;
  processingId: string;
}

export const useIdempotentSocket = () => {
  const [metrics, setMetrics] = useState<Record<string, MetricData>>({});
  const [isConnected, setIsConnected] = useState(false);
  const socketRef = useRef<WebSocket | null>(null);
  const processedIds = useRef(new Set<string>(lastProcessedId ? [lastProcessedId] : []));

  const connect = useCallback(() => {
    if (socketRef.current && socketRef.current.readyState === WebSocket.OPEN) {
      return;
    }

    const socket = new WebSocket(WEBSOCKET_URL);
    socketRef.current = socket;

    socket.onopen = () => {
      console.log('WebSocket connected.');
      setIsConnected(true);
      
      // On successful reconnect, tell the server the last ID we processed.
      if (lastProcessedId) {
        socket.send(JSON.stringify({
          type: 'RECONNECT',
          lastSeenId: lastProcessedId,
        }));
      }
    };

    socket.onmessage = (event) => {
      try {
        const data: MetricData | { type: string; code: string } = JSON.parse(event.data);
        
        if ('type' in data && data.type === 'ERROR') {
             console.error('Received server error:', data.code);
             // Trigger logic to fetch a full state snapshot via REST API.
             return;
        }

        const metric = data as MetricData;
        if (!metric.processingId) return;

        // The core idempotency check on the client-side.
        if (processedIds.current.has(metric.processingId)) {
          console.log(`Duplicate message received, ignoring ID: ${metric.processingId}`);
          return;
        }

        processedIds.current.add(metric.processingId);
        lastProcessedId = metric.processingId;
        sessionStorage.setItem('lastProcessedId', lastProcessedId);

        setMetrics(prevMetrics => ({
          ...prevMetrics,
          [metric.metricName]: metric,
        }));

      } catch (error) {
        console.error('Failed to parse WebSocket message:', error);
      }
    };

    socket.onclose = () => {
      console.log('WebSocket disconnected. Attempting to reconnect...');
      setIsConnected(false);
      setTimeout(connect, RECONNECT_INTERVAL_MS);
    };

    socket.onerror = (error) => {
      console.error('WebSocket error:', error);
      socket.close(); // This will trigger the onclose handler for reconnection logic
    };

  }, []);

  useEffect(() => {
    connect();
    return () => {
      socketRef.current?.close();
    };
  }, [connect]);

  return { metrics, isConnected };
};

This hook is then consumed by our display components. Because we are using a headless component pattern, the component itself is trivial. It knows nothing about WebSockets or state management; it just receives props and renders UI. This separation of concerns is critical for maintainability.

// components/MetricCard.tsx
// This component would be built using primitives from a Headless UI library.
// For this example, we'll use simple divs.

interface MetricCardProps {
  name: string;
  data: MetricData | undefined;
}

const MetricCard = ({ name, data }: MetricCardProps) => {
  if (!data) {
    return <div>Loading {name}...</div>;
  }
  return (
    <div style={{ border: '1px solid #ccc', padding: '16px', margin: '8px' }}>
      <h3>{data.metricName}</h3>
      <p>Average Value: {data.avgValue.toFixed(2)}</p>
      <p>Event Count: {data.eventCount}</p>
      <small>Window: {new Date(data.windowEnd).toLocaleTimeString()}</small>
    </div>
  );
};

Validating the Entire Pipeline with Jest

The most significant challenge was proving that this entire system actually worked. Unit tests are insufficient. We needed integration tests that could simulate the chaotic reality of network failures. We used Jest for its powerful ecosystem, including mocking capabilities for our WebSocket server tests.

First, we wrote integration tests for the WebSocket server itself. We used the mock-socket library to create a mock client and server environment, allowing us to simulate connection drops and message sequences without actual network I/O.

// __tests__/websocketServer.test.js
import { Server, WebSocket } from 'mock-socket';
import { jest } from '@jest/globals';

// We would need to refactor the server code slightly to be importable and testable.
// This is a conceptual test of the core logic. Assume we have a `MessageHandler` class.

describe('Idempotent WebSocket Message Handling', () => {
  let mockServer;
  const FAKE_URL = 'ws://localhost:8080';

  beforeEach(() => {
    mockServer = new Server(FAKE_URL);
  });

  afterEach(() => {
    mockServer.stop();
  });

  test('should broadcast a new message to a connected client', (done) => {
    const client = new WebSocket(FAKE_URL);
    
    mockServer.on('connection', socket => {
      // Simulate message from Kafka being broadcast
      socket.send(JSON.stringify({ processingId: 'id-1', value: 100 }));
    });
    
    client.onmessage = event => {
      const data = JSON.parse(event.data);
      expect(data.processingId).toBe('id-1');
      expect(data.value).toBe(100);
      client.close();
      done();
    };
  });

  test('should replay missed messages upon client reconnect', (done) => {
    const messageBuffer = [
      { processingId: 'id-1', payload: { processingId: 'id-1', value: 100 } },
      { processingId: 'id-2', payload: { processingId: 'id-2', value: 200 } },
      { processingId: 'id-3', payload: { processingId: 'id-3', value: 300 } },
    ];
    
    // This mocks the server's internal logic for handling reconnects
    mockServer.on('connection', socket => {
      socket.on('message', event => {
        const request = JSON.parse(event.data);
        if (request.type === 'RECONNECT' && request.lastSeenId === 'id-1') {
          // Replay messages 2 and 3
          socket.send(JSON.stringify(messageBuffer[1].payload));
          socket.send(JSON.stringify(messageBuffer[2].payload));
        }
      });
    });

    const client = new WebSocket(FAKE_URL);
    const receivedMessages = [];

    client.onopen = () => {
      // Simulate reconnecting and telling the server the last ID we saw
      client.send(JSON.stringify({ type: 'RECONNECT', lastSeenId: 'id-1' }));
    };
    
    client.onmessage = event => {
      receivedMessages.push(JSON.parse(event.data));
      if (receivedMessages.length === 2) {
        expect(receivedMessages.map(m => m.processingId)).toEqual(['id-2', 'id-3']);
        client.close();
        done();
      }
    };
  });
});

The final layer of testing involved a full end-to-end test using Jest and Playwright. This test would launch a browser, navigate to our application, and then we would programmatically kill and restart the WebSocket server process. The test would then assert that after the client reconnected, the data displayed on the page was correct and contained no duplicates or gaps, validating the entire pipeline from Spark to the rendered pixel.

graph TD
    A[Apache Spark Job] -- "Generates {processingId, payload}" --> B(Kafka Topic: aggregated_metrics);
    B --> C{Stateful WebSocket Server};
    C -- "Tracks lastAckId per client" --> D[Client Browser];
    C -- "Stores messages in Replay Buffer" --> E[In-Memory/Redis Buffer];
    D -- "On disconnect/reconnect" --> F(Send RECONNECT with lastSeenId);
    F --> C;
    C -- "Finds messages in buffer" --> E;
    E -- "Returns messages > lastSeenId" --> C;
    C -- "Replays missed messages" --> D;
    D -- "Deduplicates based on processingId" --> G[Headless UI Components];
    G -- "Renders final state" --> H(User View);

The primary lingering issue in this architecture is the scalability of the WebSocket server tier. While the logic is sound, holding the replay buffer and client state in the memory of a single Node.js process creates a single point of failure and a scaling bottleneck. The immediate next step for a production system would be to externalize this state to a distributed, highly-available store like Redis. Using Redis Streams for the message buffer and a Redis Hash for client session data would allow us to run multiple, stateless WebSocket server instances behind a load balancer. Furthermore, the client’s recovery process for a buffer overflow—where its lastSeenId is older than the entire replay buffer—is currently a hard failure. A more robust implementation would involve the client initiating a full data fetch from a REST API to rebuild its state before resuming its WebSocket subscription.


  TOC