Building a Real-Time Data Reconciliation Pipeline with Koa Delta Lake and Redux


The core technical challenge was twofold: our operations team required a live, sub-second view of critical inventory table changes, while our analytics team needed a perfectly versioned, auditable historical record of these same changes for compliance and trend analysis. The initial architecture was bifurcated. A Redis cache-and-push mechanism served the real-time dashboard, while a nightly ETL job dumped data into a data warehouse. This inevitably led to consistency drift and a significant reconciliation burden. The operational view could be out-of-sync with the analytical source of truth for up to 24 hours. A unified pipeline was not a “nice-to-have”; it was a business necessity.

Our concept was to build a system around a single, immutable stream of change data capture (CDC) events that would hydrate both the real-time UI and the long-term analytical store simultaneously. This approach promised to eliminate the data drift problem entirely.

Technology selection was critical and, admittedly, unconventional.

  • For the analytical store, we needed more than just a data lake. ACID transactions, schema enforcement, and, most importantly, time-travel queries were non-negotiable for auditing. This immediately pointed to Delta Lake. Its ability to query the state of a table at any specific point in time was the killer feature.
  • For the frontend state, the high velocity of incoming updates demanded a predictable and robust state container. Redux, particularly with Redux Toolkit, provided the structured, testable framework to manage a complex, streaming dataset without causing chaotic UI re-renders.
  • The component bridging these two worlds was the most contentious choice: Koa.js. A JVM-based stream processor like Flink or Spark Streaming would be the textbook choice. However, our team’s deep expertise in Node.js, combined with the requirement for a highly responsive WebSocket layer, made Koa a pragmatic decision. We hypothesized that for this specific “fan-out” and light transformation logic, a lightweight, async I/O-focused Node.js application would be sufficient and far simpler to deploy and maintain than a full-fledged JVM cluster.

This post-mortem documents the architecture and implementation of that pipeline, focusing on the practical challenges and solutions encountered when orchestrating this specific combination of technologies.

The Foundation: Writing to Delta Lake from Node.js

The first and most significant hurdle was reliable interaction with Delta Lake from a Node.js environment. The primary Delta Lake libraries are built for the JVM and Python ecosystems. In a real-world project, introducing a new language stack just for a single integration point adds significant operational overhead.

The solution emerged from the Rust ecosystem: delta-rs. Its Node.js bindings, @delta-rs/delta-js, provided a high-performance, native interface to read and write Delta tables. This allowed us to keep our entire backend service within the Node.js runtime.

Here is the core DeltaWriter service. It’s designed to be stateful within the application’s lifecycle, managing the table instance and handling writes. The configuration points to an S3-compatible object store, a common setup for data lakes.

// src/services/deltaWriter.js

import { DeltaTable, DeltaWriter, rust } from '@delta-rs/delta-js';
import { RecordBatch, tableFromIPC, Utf8, Int32, Float64, TimestampMillisecond } from 'apache-arrow';
import { S3Client } from '@aws-sdk/client-s3';
import { v4 as uuidv4 } from 'uuid';
import logger from '../utils/logger.js';

const config = {
  s3: {
    region: process.env.AWS_REGION || 'us-east-1',
    endpoint: process.env.S3_ENDPOINT, // e.g., http://minio:9000
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
    bucket: process.env.S3_BUCKET || 'delta-lake-bucket',
    forcePathStyle: true,
  },
  tablePath: 'inventory_audits',
};

// Arrow schema defines the structure of our Delta table.
// A common mistake is not strictly defining this, leading to data quality issues.
const INVENTORY_SCHEMA = {
  id: new Utf8(),
  product_id: new Int32(),
  change_type: new Utf8(), // 'INSERT', 'UPDATE', 'DELETE'
  old_quantity: new Int32(),
  new_quantity: new Int32(),
  change_amount: new Int32(),
  timestamp_ms: new TimestampMillisecond(),
  source_tx_id: new Utf8(),
  ingestion_id: new Utf8(), // For idempotency
};

class DeltaWriterService {
  constructor() {
    this.tablePath = `s3a://${config.s3.bucket}/${config.tablePath}`;
    this.storageOptions = {
      'AWS_REGION': config.s3.region,
      'AWS_ENDPOINT_URL': config.s3.endpoint,
      'AWS_ACCESS_KEY_ID': config.s3.accessKeyId,
      'AWS_SECRET_ACCESS_KEY': config.s3.secretAccessKey,
      'AWS_S3_LOCKING_PROVIDER': 'dynamodb', // Critical for concurrent writes
      'AWS_STORAGE_ALLOW_HTTP': 'true',
      'AWS_S3_FORCE_PATH_STYLE': 'true',
    };
    this.table = null;
    this.isInitialized = false;
  }

  /**
   * Initializes the service by creating the Delta table if it does not exist.
   * This should be called at application startup.
   */
  async initialize() {
    try {
      logger.info(`Checking for Delta table at: ${this.tablePath}`);
      this.table = await DeltaTable.open(this.tablePath, this.storageOptions);
      logger.info('Delta table found. Version:', this.table.version());
    } catch (err) {
      // A common case is the table not existing yet.
      if (err.message.includes('Not a Delta table')) {
        logger.warn('Delta table not found. Attempting to create it.');
        await this.createTable();
      } else {
        logger.error({ err }, 'Failed to open Delta table.');
        throw err;
      }
    }
    this.isInitialized = true;
  }

  async createTable() {
    // Create an empty Arrow RecordBatch with the correct schema to initialize the table.
    const emptyBatch = new RecordBatch(INVENTORY_SCHEMA, 0);
    this.table = await rust.DeltaTable.create(
      this.tablePath,
      rust.arrow.Table.from(emptyBatch),
      'delta-rs',
      'Initial commit',
      this.storageOptions,
      {
        name: 'inventory_audits',
        description: 'Real-time audit log of inventory changes.',
        partition_columns: ['change_type'], // Partitioning is key for query performance
      },
    );
    logger.info('Successfully created new Delta table.');
  }

  /**
   * Writes a batch of processed CDC events to the Delta table.
   * @param {Array<object>} events - An array of event objects matching the INVENTORY_SCHEMA.
   */
  async writeBatch(events) {
    if (!this.isInitialized || !this.table) {
      throw new Error('DeltaWriterService is not initialized.');
    }
    if (events.length === 0) {
      return;
    }

    // A critical step is transforming the JS objects into an Arrow RecordBatch.
    // This is where performance can be won or lost.
    const arrowData = this.transformToArrow(events);
    const batch = new RecordBatch(arrowData);

    try {
      const writer = await DeltaWriter.forTable(this.table);
      await writer.write(batch);
      const commitResult = await writer.commit();
      logger.info(`Successfully wrote batch of ${events.length} records. New table version: ${commitResult.version}`);
    } catch (err) {
      logger.error({ err, record_count: events.length }, 'Failed to write batch to Delta table.');
      // In a production system, failed batches should be sent to a dead-letter queue.
      throw err;
    }
  }

  /**
   * Helper to convert an array of JS objects to Arrow columns.
   * @param {Array<object>} events
   * @returns {object} Arrow-compatible column data.
   */
  transformToArrow(events) {
    const columns = {};
    for (const key of Object.keys(INVENTORY_SCHEMA)) {
      columns[key] = events.map(e => e[key]);
    }
    // `tableFromIPC` is a bit of a misnomer; here we use it to construct a table from JS arrays.
    // `apache-arrow` library handles the conversion to the underlying columnar format.
    return tableFromIPC(columns).batches[0].data.childData;
  }
}

// Singleton pattern for the service
const deltaWriterService = new DeltaWriterService();
export default deltaWriterService;

The pitfall here is concurrency control. Multiple instances of this service writing to the same Delta table will lead to commit conflicts. The solution is to use a locking provider. For S3, DynamoDB is the standard choice. This configuration (AWS_S3_LOCKING_PROVIDER) ensures that commits are atomic, preventing table corruption.

The Pipeline Core: Koa as a Kafka Consumer and Event Fan-Out

With the Delta Lake writer in place, the next step was to build the Koa service that consumes from a Kafka topic (fed by Debezium CDC) and fans the data out to two destinations: the DeltaWriterService and connected WebSocket clients.

We used kafkajs for its robustness and clean async API. A common mistake in stream processing is to auto-commit Kafka offsets. This can lead to data loss if the application crashes after committing the offset but before successfully processing the message. We implemented manual offset management to guarantee at-least-once semantics.

// src/services/kafkaConsumer.js
import { Kafka, logLevel } from 'kafkajs';
import logger from '../utils/logger.js';
import deltaWriterService from './deltaWriter.js';
import webSocketManager from './webSocketManager.js';

const kafka = new Kafka({
  clientId: 'inventory-pipeline-service',
  brokers: process.env.KAFKA_BROKERS.split(','),
  logLevel: logLevel.WARN,
});

const consumer = kafka.consumer({ groupId: 'delta-writer-group' });

// This function is the heart of the business logic.
// It transforms the raw Debezium event into our target schema.
function transformDebeziumEvent(message) {
  try {
    const event = JSON.parse(message.value.toString());
    // Debezium message format is verbose; we extract what we need.
    const payload = event.payload;
    if (!payload || !payload.op) return null;

    const before = payload.before || {};
    const after = payload.after || {};

    const changeTypeMap = { 'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE' };

    return {
      id: after.id ? after.id.toString() : before.id.toString(),
      product_id: after.product_id || before.product_id,
      change_type: changeTypeMap[payload.op],
      old_quantity: before.quantity || 0,
      new_quantity: after.quantity || 0,
      change_amount: (after.quantity || 0) - (before.quantity || 0),
      timestamp_ms: new Date(payload.ts_ms),
      source_tx_id: payload.source.txId.toString(),
      ingestion_id: message.key ? message.key.toString() : uuidv4(),
    };
  } catch (err) {
    logger.error({ err, rawMessage: message.value.toString() }, 'Failed to parse Debezium event.');
    return null;
  }
}


export const runConsumer = async () => {
  await consumer.connect();
  await consumer.subscribe({ topic: 'dbserver1.public.inventory', fromBeginning: true });
  logger.info('Kafka consumer connected and subscribed.');

  await consumer.run({
    eachBatchAutoResolve: false, // We control the batch processing lifecycle
    eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
      const transformedEvents = [];
      for (const message of batch.messages) {
        if (!isRunning() || isStale()) break;
        
        const event = transformDebeziumEvent(message);
        if (event) {
          transformedEvents.push(event);
        }
        // We resolve offsets message by message within the batch logic
        // But the final commit happens after the whole batch is processed.
        resolveOffset(message.offset);
      }

      if (transformedEvents.length > 0) {
        try {
          // First, attempt to write to the durable store. This is the critical step.
          await deltaWriterService.writeBatch(transformedEvents);

          // Only after a successful durable write do we push to transient clients.
          webSocketManager.broadcast(transformedEvents);
          
          logger.info(`Processed batch of ${batch.messages.length} messages.`);
        } catch (error) {
          logger.error({ error }, 'Failed to process batch. Offsets will not be committed. Retrying on next poll.');
          // By not committing, Kafka will re-deliver this batch after a timeout.
          // This is our at-least-once delivery guarantee.
          // A robust implementation would have a circuit breaker or dead-letter queue here.
          throw error; // Propagate to trigger re-delivery
        }
      }
      
      await heartbeat();
    },
  });
};

export const shutdownConsumer = async () => {
  logger.info('Shutting down Kafka consumer...');
  await consumer.disconnect();
};

This code is wrapped in a Koa application structure. The main app.js initializes the services and the WebSocket server.

// src/app.js
import Koa from 'koa';
import http from 'http';
import logger from './utils/logger.js';
import deltaWriterService from './services/deltaWriter.js';
import { runConsumer, shutdownConsumer } from './services/kafkaConsumer.js';
import { initializeWebSocketManager } from './services/webSocketManager.js';

const app = new Koa();
const server = http.createServer(app.callback());

app.use(async (ctx) => {
  ctx.body = { status: 'ok', version: deltaWriterService.table?.version() || 'initializing' };
});

const PORT = process.env.PORT || 3000;

const startServer = async () => {
  try {
    // Initialization must be sequential and blocking at startup.
    logger.info('Initializing Delta Writer Service...');
    await deltaWriterService.initialize();

    logger.info('Initializing WebSocket Manager...');
    initializeWebSocketManager(server); // Attaches WebSocket server to HTTP server

    logger.info('Starting Kafka Consumer...');
    await runConsumer();

    server.listen(PORT, () => {
      logger.info(`Server running on port ${PORT}`);
    });
  } catch (error) {
    logger.fatal({ error }, 'Failed to start server.');
    process.exit(1);
  }
};

const gracefulShutdown = async (signal) => {
  logger.warn(`Received ${signal}. Shutting down gracefully...`);
  server.close(async () => {
    logger.info('HTTP server closed.');
    await shutdownConsumer();
    process.exit(0);
  });
};

process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

startServer();

The WebSocket manager itself uses the ws library. A key optimization is to batch updates. Sending one WebSocket message per Kafka message would create excessive network chatter and churn on the client. Instead, we broadcast the entire processed batch in a single message.

// src/services/webSocketManager.js
import { WebSocketServer } from 'ws';
import logger from '../utils/logger.js';

class WebSocketManager {
  constructor() {
    this.wss = null;
  }

  initialize(server) {
    this.wss = new WebSocketServer({ server });
    this.wss.on('connection', (ws) => {
      logger.info('Client connected to WebSocket.');
      ws.on('close', () => {
        logger.info('Client disconnected.');
      });
      ws.on('error', (error) => {
        logger.error({ error }, 'WebSocket error.');
      });
    });
    logger.info('WebSocketManager initialized.');
  }

  /**
   * Broadcasts a batch of events to all connected clients.
   * @param {Array<object>} events 
   */
  broadcast(events) {
    if (!this.wss) return;

    const payload = JSON.stringify({
      type: 'INVENTORY_UPDATES',
      payload: events,
    });

    this.wss.clients.forEach((client) => {
      if (client.readyState === 1) { // WebSocket.OPEN
        client.send(payload);
      }
    });
  }
}

const manager = new WebSocketManager();

// Export as a singleton instance and an initializer function
export default manager;
export const initializeWebSocketManager = (server) => manager.initialize(server);

The Frontend: Redux for Real-Time State Reconciliation

The final piece is the frontend client that consumes this WebSocket stream. A naive implementation might simply store the array of inventory items and replace it on every update, triggering a full re-render of the displayed table. This would perform poorly with a high-velocity stream.

Redux Toolkit’s createEntityAdapter is purpose-built for this scenario. It provides a normalized state shape ({ ids: [], entities: {} }) and memoized selectors, which are critical for performance. Updates to a single entity do not cause components displaying other entities to re-render.

// src/client/features/inventory/inventorySlice.js
import { createSlice, createEntityAdapter, createSelector } from '@reduxjs/toolkit';

const inventoryAdapter = createEntityAdapter({
  // Assume `id` is the unique identifier for each inventory item.
  selectId: (item) => item.id,
  // Keep the collection sorted by product ID
  sortComparer: (a, b) => a.product_id - b.product_id,
});

const initialState = inventoryAdapter.getInitialState({
  status: 'idle', // 'idle', 'connected', 'disconnected'
});

const inventorySlice = createSlice({
  name: 'inventory',
  initialState,
  reducers: {
    // Action to handle a batch of updates from the WebSocket
    updatesReceived(state, action) {
      const updates = action.payload;
      const validUpdates = [];
      
      // We process updates and apply the latest state.
      // A common pitfall is applying stale updates out of order.
      // Debezium events are ordered, but here we simply apply what we get.
      updates.forEach(update => {
        const existing = state.entities[update.id];
        const newItem = {
          id: update.id,
          product_id: update.product_id,
          quantity: update.new_quantity,
          last_change_type: update.change_type,
          last_updated: update.timestamp_ms,
        };
        
        // Use upsertMany for efficiency. It will add new items or update existing ones.
        if (update.change_type !== 'DELETE') {
           validUpdates.push(newItem);
        }
      });
      
      if(validUpdates.length > 0) {
        inventoryAdapter.upsertMany(state, validUpdates);
      }

      const deletedIds = updates.filter(u => u.change_type === 'DELETE').map(u => u.id);
      if (deletedIds.length > 0) {
          inventoryAdapter.removeMany(state, deletedIds);
      }
    },
    connectionStatusChanged(state, action) {
        state.status = action.payload;
    }
  },
});

export const { updatesReceived, connectionStatusChanged } = inventorySlice.actions;

export default inventorySlice.reducer;

// Export memoized selectors for performance
export const {
  selectAll: selectAllInventoryItems,
  selectById: selectInventoryItemById,
} = inventoryAdapter.getSelectors((state) => state.inventory);

The WebSocket middleware for Redux connects to the backend and dispatches the updatesReceived action when a message arrives.

// src/client/middleware/websocketMiddleware.js
import { connectionStatusChanged, updatesReceived } from '../features/inventory/inventorySlice.js';

const websocketMiddleware = (store) => {
  let socket = null;

  const onOpen = (store) => () => {
    store.dispatch(connectionStatusChanged('connected'));
  };

  const onClose = (store) => () => {
    store.dispatch(connectionStatusChanged('disconnected'));
  };

  const onMessage = (store) => (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'INVENTORY_UPDATES') {
      store.dispatch(updatesReceived(data.payload));
    }
  };

  return (next) => (action) => {
    switch (action.type) {
      case 'WS_CONNECT':
        if (socket !== null) {
          socket.close();
        }
        socket = new WebSocket(action.payload.url);
        socket.onmessage = onMessage(store);
        socket.onclose = onClose(store);
        socket.onopen = onOpen(store);
        break;
      case 'WS_DISCONNECT':
        if (socket !== null) {
          socket.close();
        }
        socket = null;
        break;
      default:
        return next(action);
    }
  };
};

export default websocketMiddleware;

This entire flow can be visualized as follows:

sequenceDiagram
    participant DB as PostgreSQL
    participant Debezium
    participant Kafka
    participant Koa as Koa Service
    participant Delta as Delta Lake (S3)
    participant Client as React/Redux UI

    DB->>Debezium: Transaction Log Change
    Debezium->>Kafka: Publishes CDC Event
    Koa->>Kafka: Consumes Batch of Events
    Note over Koa: Transforms Events
    Koa->>Delta: writeBatch(events)
    Delta-->>Koa: Commit Success
    Note over Koa: Only after commit...
    Koa->>Client: Broadcasts Batch via WebSocket
    Client->>Client: Redux dispatches `updatesReceived`
    Client->>Client: UI updates reactively

The final result is a system where the operational UI and the analytical datastore are hydrated from the exact same source of truth, eliminating drift. The analytics team can run time-travel queries against the Delta table (SELECT * FROM inventory_audits VERSION AS OF 5) to see the state of the system at a specific commit, providing unparalleled auditability.

Limitations and Future Iterations

This architecture, while effective, is not without its trade-offs and areas for improvement. The Koa.js service is currently a single point of failure and a potential performance bottleneck. The immediate next step is to containerize and deploy multiple instances of the service, leveraging Kafka’s consumer group protocol to distribute the partition load. This, however, complicates the WebSocket layer, as a client connected to instance A will not receive broadcasts from instance B. A shared backplane like Redis Pub/Sub would be required to synchronize broadcasts across all service instances.

Secondly, the high frequency of small file writes to Delta Lake is a well-known pattern that can degrade read performance over time. A separate, scheduled maintenance job is required to run OPTIMIZE and Z-ORDER BY commands on the table, compacting small files into larger ones and improving data skipping. This is currently a manual process that must be automated.

Finally, the pipeline currently assumes a static data schema. Handling schema evolution gracefully is a significant challenge. A change in the source database table requires a coordinated update of the Delta table schema, the transformation logic in the Koa service, and potentially the frontend state structure. This is a complex problem space that our current implementation does not address.


  TOC