Implementing a Real-Time Oracle-to-Dgraph Synchronization Pipeline Using CDC and WebSockets


The core technical challenge was to propagate data changes from a legacy Oracle monolith, representing a complex network of physical assets, to a modern, real-time graph visualization interface. The existing system relied on nightly batch jobs, rendering it useless for operational monitoring. Direct queries against the Oracle database for graph-style traversals were prohibitively slow, often involving dozens of JOINs and causing significant load on a critical production system. Polling for changes was dismissed immediately due to scalability concerns and the high cost of unnecessary database hits.

Our initial concept was to materialize a graph view of the Oracle data into a database built for such queries. Dgraph was selected for its native graph model and GraphQL API, which would simplify front-end development. The main problem remained: how to keep this Dgraph instance synchronized with Oracle in near real-time. Database triggers were considered but rejected due to their invasive nature and the operational risk of adding complex logic directly into the monolith’s database schema.

The final architecture hinged on Change Data Capture (CDC). We decided to leverage Debezium’s Oracle connector to stream every committed row-level change into a Kafka topic. A backend service, acting as a consumer, would then be responsible for transforming this relational stream into graph mutations and applying them to Dgraph. Finally, this service would push lightweight notifications over WebSockets to subscribed clients, prompting them to refetch updated graph data. To ensure the integrity of this entire asynchronous pipeline, we committed to building an end-to-end testing strategy using Cypress, capable of validating the flow from an Oracle transaction all the way to a DOM update in the browser.

Phase 1: Preparing the Oracle Source

A fundamental prerequisite for CDC is enabling the database’s transaction logging capabilities. For Oracle, this means running in ARCHIVELOG mode and enabling supplemental logging. In a real-world project, this is a non-trivial change that requires careful planning with a DBA team, as it impacts storage and performance.

First, verify the current logging status:

SELECT log_mode FROM v$database;

If the result is NOARCHIVELOG, the database must be shut down and restarted in mount mode to make the change.

-- This is a DBA-level operation and requires a maintenance window.
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

With ARCHIVELOG mode enabled, we must enable supplemental logging. This instructs Oracle to write additional information to the redo log for tools like Debezium to reconstruct row changes.

-- Enable minimal supplemental logging at the database level.
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

-- For the specific tables we want to capture, we need to enable full supplemental logging.
-- Let's assume two tables: ASSETS (nodes) and ASSET_RELATIONSHIPS (edges).
ALTER TABLE C##MYUSER.ASSETS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE C##MYUSER.ASSET_RELATIONSHIPS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

The pitfall here is underestimating the impact. Enabling (ALL) COLUMNS logging increases the volume of redo log generation. It’s crucial to monitor disk space and I/O performance on the Oracle server after this change. A common mistake is to forget this step, leading to cryptic errors from the Debezium connector.

Phase 2: The CDC Pipeline with Debezium and Kafka

We used Docker Compose to stand up the core infrastructure: Zookeeper, Kafka, and Kafka Connect with the Debezium Oracle connector.

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  connect:
    image: confluentinc/cp-kafka-connect:7.3.0
    container_name: connect
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: cdc-group
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
    volumes:
      - ./debezium-connector-oracle:/usr/share/confluent-hub-components/debezium-connector-oracle

# Note: The debezium-connector-oracle directory must contain the extracted Debezium Oracle connector JARs.

The next critical piece is the connector configuration itself. This JSON object is POSTed to the Kafka Connect REST API (http://localhost:8083/connectors).

{
  "name": "oracle-asset-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.server.name": "ORCLSRV",
    "database.hostname": "oracle.host.internal",
    "database.port": "1521",
    "database.user": "c##myuser",
    "database.password": "debezium",
    "database.dbname": "ORCLCDB",
    "database.pdb.name": "ORCLPDB1",
    "database.connection.adapter": "logminer",
    "log.mining.strategy": "online_catalog",
    "table.include.list": "C##MYUSER.ASSETS,C##MYUSER.ASSET_RELATIONSHIPS",
    "topic.prefix": "oracle-cdc",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "string",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
    "schema.history.internal.kafka.topic": "schema-history.assets"
  }
}

A few key configurations to highlight:

  • database.connection.adapter: logminer is the standard, robust choice for interacting with Oracle’s redo logs.
  • table.include.list: Explicitly defines which tables we are monitoring. This is crucial for preventing noise from other tables in the monolith.
  • snapshot.mode: initial tells Debezium to perform a full read of the included tables upon its first start, ensuring the target system is fully seeded before streaming live changes. In a production restart, a mode like schema_only_recovery might be more appropriate to avoid re-snapshotting.
  • value.converter.schemas.enable: Setting this to false provides a cleaner JSON payload without the verbose schema information, which is easier for our downstream consumer to parse.

With this configuration, any INSERT, UPDATE, or DELETE on ASSETS or ASSET_RELATIONSHIPS tables will produce a message in the corresponding Kafka topic (e.g., oracle-cdc.C__MYUSER.ASSETS).

Phase 3: The Backend Transformation Service

We built the consumer service in Node.js with TypeScript, using kafkajs for Kafka consumption, dgraph-js for database interaction, and the ws library for the WebSocket layer.

The overall data flow within the service is visualized below:

sequenceDiagram
    participant Kafka
    participant BackendService as Backend Service (Consumer)
    participant Dgraph
    participant WebSocketServer as WebSocket Server
    participant Client

    Kafka->>+BackendService: Pushes Debezium Message
    BackendService->>BackendService: Parse Message (op, before, after)
    BackendService->>BackendService: Transform to Dgraph Mutation
    alt is valid change
        BackendService->>+Dgraph: Execute Upsert Mutation
        Dgraph-->>-BackendService: Confirm Success
        BackendService->>+WebSocketServer: Publish Notification (e.g., asset_updated)
        WebSocketServer->>Client: Push message to subscribed clients
        Client->>Client: Trigger UI refetch/update
    else invalid change
        BackendService->>BackendService: Log error and acknowledge message
    end
    BackendService-->>-Kafka: Commit Offset

Kafka Consumer and Message Parsing

The consumer connects to the Kafka cluster and subscribes to the topics. The core logic resides in the message handler.

// src/services/kafkaConsumer.ts
import { Kafka, EachMessagePayload } from 'kafkajs';
import { handleAssetChange } from '../processors/assetProcessor';
import { handleRelationshipChange } from '../processors/relationshipProcessor';
import { logger } from '../utils/logger';

const kafka = new Kafka({
  clientId: 'graph-sync-service',
  brokers: ['localhost:9092'],
});

const consumer = kafka.consumer({ groupId: 'dgraph-transformer-group' });

// Debezium message structure for the 'after' payload
interface AssetPayload {
  ID: number;
  ASSET_NAME: string;
  ASSET_TYPE: string;
  LOCATION: string;
  LAST_MODIFIED: number;
}

interface RelationshipPayload {
    ID: number;
    SOURCE_ASSET_ID: number;
    TARGET_ASSET_ID: number;
    RELATIONSHIP_TYPE: string;
}

const handleMessage = async ({ topic, partition, message }: EachMessagePayload) => {
  if (!message.value) {
    logger.warn('Received message with null value');
    return;
  }

  try {
    const payload = JSON.parse(message.value.toString());
    const op = payload.op; // 'c' for create, 'u' for update, 'd' for delete, 'r' for read (snapshot)

    // A delete operation has a 'before' but no 'after'
    const data = op === 'd' ? payload.before : payload.after;
    
    if (!data) {
        logger.warn({ op, topic }, 'Message has no data payload.');
        return;
    }

    logger.info({ topic, op }, 'Processing CDC message');

    if (topic === 'oracle-cdc.C__MYUSER.ASSETS') {
      await handleAssetChange(op, data as AssetPayload);
    } else if (topic === 'oracle-cdc.C__MYUSER.ASSET_RELATIONSHIPS') {
      await handleRelationshipChange(op, data as RelationshipPayload);
    }

  } catch (error) {
    logger.error({ err: error, topic }, 'Error processing Kafka message');
    // In a production system, push to a DLQ instead of just logging.
    // For now, we will let the consumer group re-process, which might not be ideal.
    throw error;
  }
};

export const runConsumer = async () => {
  await consumer.connect();
  await consumer.subscribe({ topics: ['oracle-cdc.C__MYUSER.ASSETS', 'oracle-cdc.C__MYUSER.ASSET_RELATIONSHIPS'], fromBeginning: true });

  await consumer.run({
    eachMessage: handleMessage,
  });
};

Relational to Graph Transformation

This is the most critical piece of custom logic. We must map the flat structure from Oracle to a graph structure suitable for Dgraph. Our Dgraph schema looks like this:

# dgraph.schema
type Asset {
    oracleId: Int! @id
    name: String! @search(by: [term])
    type: String @search(by: [term])
    location: String
    lastModified: DateTime
    relationships: [Asset] @hasInverse(field: relationships)
}

The handleAssetChange function in assetProcessor.ts transforms an ASSET row change into a Dgraph mutation. We use an “upsert” pattern (a query to find the node, and a mutation to create or update it) to ensure idempotency. This is vital because Kafka can sometimes redeliver messages.

// src/processors/assetProcessor.ts
import { getDgraphClient } from '../services/dgraphClient';
import { broadcastUpdate } from '../services/webSocketServer';
import { logger } from '../utils/logger';

interface AssetPayload {
  ID: number;
  ASSET_NAME: string;
  ASSET_TYPE: string;
  LOCATION: string;
  LAST_MODIFIED: number;
}

export const handleAssetChange = async (op: string, data: AssetPayload) => {
  const dgraphClient = getDgraphClient();
  const txn = dgraphClient.newTxn();
  try {
    const oracleId = data.ID;

    if (op === 'd') {
      // Handle deletion
      const deleteMutation = {
        delete: [{
          'uid|oracleId': oracleId,
        }],
      };
      await txn.mutate({ mutation: `delete {
          uid(func: eq(oracleId, ${oracleId})) * * .
      }`});

      logger.info({ oracleId }, 'Deleted asset node from Dgraph');
    } else {
      // Handle create/update (upsert)
      const mutation = {
        set: [
          {
            'uid': `_:${oracleId}`,
            'dgraph.type': 'Asset',
            'oracleId': oracleId,
            'name': data.ASSET_NAME,
            'type': data.ASSET_TYPE,
            'location': data.LOCATION,
            'lastModified': new Date(data.LAST_MODIFIED).toISOString()
          }
        ]
      };
      
      const upsertQuery = `
        query {
          asset as var(func: eq(oracleId, ${oracleId}))
        }
      `;

      const request = {
          query: upsertQuery,
          mutations: [{ ...mutation, cond: '@if(eq(len(asset), 0))' }, { ...mutation, cond: '@if(gt(len(asset), 0))' }]
      };
      
      // A more direct upsert block in a single mutation
      const upsertMutation = `
        upsert {
            query {
                v as var(func: eq(oracleId, "${oracleId}"))
            }

            mutation {
                set {
                    uid(v) <dgraph.type> "Asset" .
                    uid(v) <oracleId> "${oracleId}" .
                    uid(v) <name> "${data.ASSET_NAME}" .
                    uid(v) <type> "${data.ASSET_TYPE}" .
                    uid(v) <location> "${data.LOCATION}" .
                    uid(v) <lastModified> "${new Date(data.LAST_MODIFIED).toISOString()}" .
                }
            }
        }
      `;
      
      await txn.mutate({ mutation: upsertMutation });
      logger.info({ oracleId }, 'Upserted asset node in Dgraph');
    }

    await txn.commit();

    // After successfully committing to Dgraph, notify clients.
    broadcastUpdate({
        type: op === 'd' ? 'ASSET_DELETED' : 'ASSET_UPDATED',
        payload: { oracleId: oracleId }
    });
  } catch(e) {
      logger.error({ err: e }, 'Dgraph transaction failed for asset change');
      throw e; // Rethrow to let kafka consumer handle retry/failure
  } finally {
      await txn.discard();
  }
};

Handling relationships is similar but involves connecting two existing nodes. The handleRelationshipChange processor looks up the UIDs of the source and target assets based on their oracleId and then creates or deletes the edge between them.

Phase 4: Real-time Notification with WebSockets

The WebSocket server is straightforward. It maintains a set of connected clients. The broadcastUpdate function, called after a successful Dgraph transaction, simply iterates over all connected clients and sends the notification payload.

// src/services/webSocketServer.ts
import { WebSocketServer, WebSocket } from 'ws';
import { logger } from '../utils/logger';

let wss: WebSocketServer;
const clients: Set<WebSocket> = new Set();

export const startWebSocketServer = (port: number) => {
  wss = new WebSocketServer({ port });
  
  wss.on('connection', (ws) => {
    logger.info('WebSocket client connected');
    clients.add(ws);

    ws.on('close', () => {
      logger.info('WebSocket client disconnected');
      clients.delete(ws);
    });

    ws.on('error', (err) => {
        logger.error({err}, 'WebSocket error');
    });
  });

  logger.info(`WebSocket server started on port ${port}`);
};

export const broadcastUpdate = (update: { type: string; payload: any }) => {
  if (!wss) {
    logger.warn('WebSocket server not initialized, cannot broadcast update.');
    return;
  }
  
  const message = JSON.stringify(update);
  logger.info({ count: clients.size, message }, 'Broadcasting update to WebSocket clients');
  
  clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message);
    }
  });
};

In a production scenario, this simple in-memory Set of clients is a single point of failure and doesn’t scale horizontally. A common mistake is to deploy this simple implementation behind a load balancer. The solution is to use a message broker like Redis Pub/Sub to broadcast messages across all backend instances.

Phase 5: Verifying the Pipeline with Cypress

Testing an asynchronous, multi-system workflow is complex. Cypress gives us the ability to script user interactions and make assertions on the DOM, but we need a way to reliably trigger and observe the outcome of our CDC pipeline.

To achieve this, we added a dedicated, non-production API endpoint to our backend service: POST /_test/create-asset. This endpoint directly inserts a record into the Oracle database, thus initiating the CDC flow.

Our Cypress test then becomes:

  1. Establish a WebSocket connection from the test runner to our backend to listen for the notification.
  2. Visit the web application page.
  3. Call the test-only API endpoint to create a new asset in Oracle.
  4. Wait for the WebSocket message confirming the update.
  5. Assert that the new asset appears in the UI.
// cypress/e2e/real-time-graph.cy.js
describe('Real-time Graph Synchronization', () => {
  it('should display a new asset in the UI after it is created in Oracle', () => {
    const assetId = `CY-${Date.now()}`;
    const assetName = `Cypress Test Asset ${assetId}`;
    let wsNotificationReceived = false;

    cy.visit('/', {
      onBeforeLoad(win) {
        // We can spy on the WebSocket connection from the application
        // Or create our own to listen for the broadcast
        const ws = new WebSocket('ws://localhost:8081'); // Assuming WS server is on 8081
        ws.onmessage = (event) => {
          const data = JSON.parse(event.data);
          if (data.type === 'ASSET_UPDATED' && data.payload.name === assetName) {
            wsNotificationReceived = true;
          }
        };
        // Expose a function to the window to check the flag
        win.hasReceivedWsNotification = () => wsNotificationReceived;
      },
    });

    // Step 1: Trigger the change in the source database
    cy.request('POST', 'http://localhost:3000/_test/create-asset', {
      name: assetName,
      type: 'SENSOR',
      location: 'Test Lab',
    }).its('status').should('equal', 201);
    
    // Step 2: Wait and assert. The key is a generous timeout.
    // The pitfall is using short, fixed waits (cy.wait(1000)).
    // A better approach is to poll for a condition.
    cy.get('[data-cy=graph-container]', { timeout: 15000 })
      .should('contain.text', assetName);
      
    // Optional: Also check that our custom WebSocket listener received the message.
    cy.window().then(win => {
        cy.wrap(null).should(() => {
            expect(win.hasReceivedWsNotification()).to.be.true;
        });
    });
  });
});

This test provides immense value. It validates the entire chain: Oracle’s supplemental logging, the Debezium connector’s configuration, Kafka message delivery, our backend’s transformation logic, the Dgraph mutation, the WebSocket broadcast, and the front-end’s ability to react to the message. It’s the ultimate confidence check for a complex, distributed system.

The primary limitation of this current architecture is its single-instance backend. Scaling out the consumer service would require careful management of WebSocket state across instances, likely using Redis. The error handling is also naive; a proper Dead Letter Queue (DLQ) is necessary to handle messages that repeatedly fail transformation or persistence, preventing the entire consumer group from stalling. Furthermore, the process for handling Oracle schema evolution is manual and brittle. A future iteration would need to build a more automated process for synchronizing DDL changes from Oracle to the Dgraph schema and updating the transformation logic, potentially using a schema registry to enforce compatibility.


  TOC