The core technical challenge was to propagate data changes from a legacy Oracle monolith, representing a complex network of physical assets, to a modern, real-time graph visualization interface. The existing system relied on nightly batch jobs, rendering it useless for operational monitoring. Direct queries against the Oracle database for graph-style traversals were prohibitively slow, often involving dozens of JOINs and causing significant load on a critical production system. Polling for changes was dismissed immediately due to scalability concerns and the high cost of unnecessary database hits.
Our initial concept was to materialize a graph view of the Oracle data into a database built for such queries. Dgraph was selected for its native graph model and GraphQL API, which would simplify front-end development. The main problem remained: how to keep this Dgraph instance synchronized with Oracle in near real-time. Database triggers were considered but rejected due to their invasive nature and the operational risk of adding complex logic directly into the monolith’s database schema.
The final architecture hinged on Change Data Capture (CDC). We decided to leverage Debezium’s Oracle connector to stream every committed row-level change into a Kafka topic. A backend service, acting as a consumer, would then be responsible for transforming this relational stream into graph mutations and applying them to Dgraph. Finally, this service would push lightweight notifications over WebSockets to subscribed clients, prompting them to refetch updated graph data. To ensure the integrity of this entire asynchronous pipeline, we committed to building an end-to-end testing strategy using Cypress, capable of validating the flow from an Oracle transaction all the way to a DOM update in the browser.
Phase 1: Preparing the Oracle Source
A fundamental prerequisite for CDC is enabling the database’s transaction logging capabilities. For Oracle, this means running in ARCHIVELOG
mode and enabling supplemental logging. In a real-world project, this is a non-trivial change that requires careful planning with a DBA team, as it impacts storage and performance.
First, verify the current logging status:
SELECT log_mode FROM v$database;
If the result is NOARCHIVELOG
, the database must be shut down and restarted in mount mode to make the change.
-- This is a DBA-level operation and requires a maintenance window.
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
With ARCHIVELOG
mode enabled, we must enable supplemental logging. This instructs Oracle to write additional information to the redo log for tools like Debezium to reconstruct row changes.
-- Enable minimal supplemental logging at the database level.
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- For the specific tables we want to capture, we need to enable full supplemental logging.
-- Let's assume two tables: ASSETS (nodes) and ASSET_RELATIONSHIPS (edges).
ALTER TABLE C##MYUSER.ASSETS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE C##MYUSER.ASSET_RELATIONSHIPS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
The pitfall here is underestimating the impact. Enabling (ALL) COLUMNS
logging increases the volume of redo log generation. It’s crucial to monitor disk space and I/O performance on the Oracle server after this change. A common mistake is to forget this step, leading to cryptic errors from the Debezium connector.
Phase 2: The CDC Pipeline with Debezium and Kafka
We used Docker Compose to stand up the core infrastructure: Zookeeper, Kafka, and Kafka Connect with the Debezium Oracle connector.
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
connect:
image: confluentinc/cp-kafka-connect:7.3.0
container_name: connect
depends_on:
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: cdc-group
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
volumes:
- ./debezium-connector-oracle:/usr/share/confluent-hub-components/debezium-connector-oracle
# Note: The debezium-connector-oracle directory must contain the extracted Debezium Oracle connector JARs.
The next critical piece is the connector configuration itself. This JSON object is POSTed to the Kafka Connect REST API (http://localhost:8083/connectors
).
{
"name": "oracle-asset-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "ORCLSRV",
"database.hostname": "oracle.host.internal",
"database.port": "1521",
"database.user": "c##myuser",
"database.password": "debezium",
"database.dbname": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.connection.adapter": "logminer",
"log.mining.strategy": "online_catalog",
"table.include.list": "C##MYUSER.ASSETS,C##MYUSER.ASSET_RELATIONSHIPS",
"topic.prefix": "oracle-cdc",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
"schema.history.internal.kafka.topic": "schema-history.assets"
}
}
A few key configurations to highlight:
-
database.connection.adapter
:logminer
is the standard, robust choice for interacting with Oracle’s redo logs. -
table.include.list
: Explicitly defines which tables we are monitoring. This is crucial for preventing noise from other tables in the monolith. -
snapshot.mode
:initial
tells Debezium to perform a full read of the included tables upon its first start, ensuring the target system is fully seeded before streaming live changes. In a production restart, a mode likeschema_only_recovery
might be more appropriate to avoid re-snapshotting. -
value.converter.schemas.enable
: Setting this tofalse
provides a cleaner JSON payload without the verbose schema information, which is easier for our downstream consumer to parse.
With this configuration, any INSERT
, UPDATE
, or DELETE
on ASSETS
or ASSET_RELATIONSHIPS
tables will produce a message in the corresponding Kafka topic (e.g., oracle-cdc.C__MYUSER.ASSETS
).
Phase 3: The Backend Transformation Service
We built the consumer service in Node.js with TypeScript, using kafkajs
for Kafka consumption, dgraph-js
for database interaction, and the ws
library for the WebSocket layer.
The overall data flow within the service is visualized below:
sequenceDiagram participant Kafka participant BackendService as Backend Service (Consumer) participant Dgraph participant WebSocketServer as WebSocket Server participant Client Kafka->>+BackendService: Pushes Debezium Message BackendService->>BackendService: Parse Message (op, before, after) BackendService->>BackendService: Transform to Dgraph Mutation alt is valid change BackendService->>+Dgraph: Execute Upsert Mutation Dgraph-->>-BackendService: Confirm Success BackendService->>+WebSocketServer: Publish Notification (e.g., asset_updated) WebSocketServer->>Client: Push message to subscribed clients Client->>Client: Trigger UI refetch/update else invalid change BackendService->>BackendService: Log error and acknowledge message end BackendService-->>-Kafka: Commit Offset
Kafka Consumer and Message Parsing
The consumer connects to the Kafka cluster and subscribes to the topics. The core logic resides in the message handler.
// src/services/kafkaConsumer.ts
import { Kafka, EachMessagePayload } from 'kafkajs';
import { handleAssetChange } from '../processors/assetProcessor';
import { handleRelationshipChange } from '../processors/relationshipProcessor';
import { logger } from '../utils/logger';
const kafka = new Kafka({
clientId: 'graph-sync-service',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'dgraph-transformer-group' });
// Debezium message structure for the 'after' payload
interface AssetPayload {
ID: number;
ASSET_NAME: string;
ASSET_TYPE: string;
LOCATION: string;
LAST_MODIFIED: number;
}
interface RelationshipPayload {
ID: number;
SOURCE_ASSET_ID: number;
TARGET_ASSET_ID: number;
RELATIONSHIP_TYPE: string;
}
const handleMessage = async ({ topic, partition, message }: EachMessagePayload) => {
if (!message.value) {
logger.warn('Received message with null value');
return;
}
try {
const payload = JSON.parse(message.value.toString());
const op = payload.op; // 'c' for create, 'u' for update, 'd' for delete, 'r' for read (snapshot)
// A delete operation has a 'before' but no 'after'
const data = op === 'd' ? payload.before : payload.after;
if (!data) {
logger.warn({ op, topic }, 'Message has no data payload.');
return;
}
logger.info({ topic, op }, 'Processing CDC message');
if (topic === 'oracle-cdc.C__MYUSER.ASSETS') {
await handleAssetChange(op, data as AssetPayload);
} else if (topic === 'oracle-cdc.C__MYUSER.ASSET_RELATIONSHIPS') {
await handleRelationshipChange(op, data as RelationshipPayload);
}
} catch (error) {
logger.error({ err: error, topic }, 'Error processing Kafka message');
// In a production system, push to a DLQ instead of just logging.
// For now, we will let the consumer group re-process, which might not be ideal.
throw error;
}
};
export const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topics: ['oracle-cdc.C__MYUSER.ASSETS', 'oracle-cdc.C__MYUSER.ASSET_RELATIONSHIPS'], fromBeginning: true });
await consumer.run({
eachMessage: handleMessage,
});
};
Relational to Graph Transformation
This is the most critical piece of custom logic. We must map the flat structure from Oracle to a graph structure suitable for Dgraph. Our Dgraph schema looks like this:
# dgraph.schema
type Asset {
oracleId: Int! @id
name: String! @search(by: [term])
type: String @search(by: [term])
location: String
lastModified: DateTime
relationships: [Asset] @hasInverse(field: relationships)
}
The handleAssetChange
function in assetProcessor.ts
transforms an ASSET
row change into a Dgraph mutation. We use an “upsert” pattern (a query to find the node, and a mutation to create or update it) to ensure idempotency. This is vital because Kafka can sometimes redeliver messages.
// src/processors/assetProcessor.ts
import { getDgraphClient } from '../services/dgraphClient';
import { broadcastUpdate } from '../services/webSocketServer';
import { logger } from '../utils/logger';
interface AssetPayload {
ID: number;
ASSET_NAME: string;
ASSET_TYPE: string;
LOCATION: string;
LAST_MODIFIED: number;
}
export const handleAssetChange = async (op: string, data: AssetPayload) => {
const dgraphClient = getDgraphClient();
const txn = dgraphClient.newTxn();
try {
const oracleId = data.ID;
if (op === 'd') {
// Handle deletion
const deleteMutation = {
delete: [{
'uid|oracleId': oracleId,
}],
};
await txn.mutate({ mutation: `delete {
uid(func: eq(oracleId, ${oracleId})) * * .
}`});
logger.info({ oracleId }, 'Deleted asset node from Dgraph');
} else {
// Handle create/update (upsert)
const mutation = {
set: [
{
'uid': `_:${oracleId}`,
'dgraph.type': 'Asset',
'oracleId': oracleId,
'name': data.ASSET_NAME,
'type': data.ASSET_TYPE,
'location': data.LOCATION,
'lastModified': new Date(data.LAST_MODIFIED).toISOString()
}
]
};
const upsertQuery = `
query {
asset as var(func: eq(oracleId, ${oracleId}))
}
`;
const request = {
query: upsertQuery,
mutations: [{ ...mutation, cond: '@if(eq(len(asset), 0))' }, { ...mutation, cond: '@if(gt(len(asset), 0))' }]
};
// A more direct upsert block in a single mutation
const upsertMutation = `
upsert {
query {
v as var(func: eq(oracleId, "${oracleId}"))
}
mutation {
set {
uid(v) <dgraph.type> "Asset" .
uid(v) <oracleId> "${oracleId}" .
uid(v) <name> "${data.ASSET_NAME}" .
uid(v) <type> "${data.ASSET_TYPE}" .
uid(v) <location> "${data.LOCATION}" .
uid(v) <lastModified> "${new Date(data.LAST_MODIFIED).toISOString()}" .
}
}
}
`;
await txn.mutate({ mutation: upsertMutation });
logger.info({ oracleId }, 'Upserted asset node in Dgraph');
}
await txn.commit();
// After successfully committing to Dgraph, notify clients.
broadcastUpdate({
type: op === 'd' ? 'ASSET_DELETED' : 'ASSET_UPDATED',
payload: { oracleId: oracleId }
});
} catch(e) {
logger.error({ err: e }, 'Dgraph transaction failed for asset change');
throw e; // Rethrow to let kafka consumer handle retry/failure
} finally {
await txn.discard();
}
};
Handling relationships is similar but involves connecting two existing nodes. The handleRelationshipChange
processor looks up the UIDs of the source and target assets based on their oracleId
and then creates or deletes the edge between them.
Phase 4: Real-time Notification with WebSockets
The WebSocket server is straightforward. It maintains a set of connected clients. The broadcastUpdate
function, called after a successful Dgraph transaction, simply iterates over all connected clients and sends the notification payload.
// src/services/webSocketServer.ts
import { WebSocketServer, WebSocket } from 'ws';
import { logger } from '../utils/logger';
let wss: WebSocketServer;
const clients: Set<WebSocket> = new Set();
export const startWebSocketServer = (port: number) => {
wss = new WebSocketServer({ port });
wss.on('connection', (ws) => {
logger.info('WebSocket client connected');
clients.add(ws);
ws.on('close', () => {
logger.info('WebSocket client disconnected');
clients.delete(ws);
});
ws.on('error', (err) => {
logger.error({err}, 'WebSocket error');
});
});
logger.info(`WebSocket server started on port ${port}`);
};
export const broadcastUpdate = (update: { type: string; payload: any }) => {
if (!wss) {
logger.warn('WebSocket server not initialized, cannot broadcast update.');
return;
}
const message = JSON.stringify(update);
logger.info({ count: clients.size, message }, 'Broadcasting update to WebSocket clients');
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
};
In a production scenario, this simple in-memory Set
of clients is a single point of failure and doesn’t scale horizontally. A common mistake is to deploy this simple implementation behind a load balancer. The solution is to use a message broker like Redis Pub/Sub to broadcast messages across all backend instances.
Phase 5: Verifying the Pipeline with Cypress
Testing an asynchronous, multi-system workflow is complex. Cypress gives us the ability to script user interactions and make assertions on the DOM, but we need a way to reliably trigger and observe the outcome of our CDC pipeline.
To achieve this, we added a dedicated, non-production API endpoint to our backend service: POST /_test/create-asset
. This endpoint directly inserts a record into the Oracle database, thus initiating the CDC flow.
Our Cypress test then becomes:
- Establish a WebSocket connection from the test runner to our backend to listen for the notification.
- Visit the web application page.
- Call the test-only API endpoint to create a new asset in Oracle.
- Wait for the WebSocket message confirming the update.
- Assert that the new asset appears in the UI.
// cypress/e2e/real-time-graph.cy.js
describe('Real-time Graph Synchronization', () => {
it('should display a new asset in the UI after it is created in Oracle', () => {
const assetId = `CY-${Date.now()}`;
const assetName = `Cypress Test Asset ${assetId}`;
let wsNotificationReceived = false;
cy.visit('/', {
onBeforeLoad(win) {
// We can spy on the WebSocket connection from the application
// Or create our own to listen for the broadcast
const ws = new WebSocket('ws://localhost:8081'); // Assuming WS server is on 8081
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'ASSET_UPDATED' && data.payload.name === assetName) {
wsNotificationReceived = true;
}
};
// Expose a function to the window to check the flag
win.hasReceivedWsNotification = () => wsNotificationReceived;
},
});
// Step 1: Trigger the change in the source database
cy.request('POST', 'http://localhost:3000/_test/create-asset', {
name: assetName,
type: 'SENSOR',
location: 'Test Lab',
}).its('status').should('equal', 201);
// Step 2: Wait and assert. The key is a generous timeout.
// The pitfall is using short, fixed waits (cy.wait(1000)).
// A better approach is to poll for a condition.
cy.get('[data-cy=graph-container]', { timeout: 15000 })
.should('contain.text', assetName);
// Optional: Also check that our custom WebSocket listener received the message.
cy.window().then(win => {
cy.wrap(null).should(() => {
expect(win.hasReceivedWsNotification()).to.be.true;
});
});
});
});
This test provides immense value. It validates the entire chain: Oracle’s supplemental logging, the Debezium connector’s configuration, Kafka message delivery, our backend’s transformation logic, the Dgraph mutation, the WebSocket broadcast, and the front-end’s ability to react to the message. It’s the ultimate confidence check for a complex, distributed system.
The primary limitation of this current architecture is its single-instance backend. Scaling out the consumer service would require careful management of WebSocket state across instances, likely using Redis. The error handling is also naive; a proper Dead Letter Queue (DLQ) is necessary to handle messages that repeatedly fail transformation or persistence, preventing the entire consumer group from stalling. Furthermore, the process for handling Oracle schema evolution is manual and brittle. A future iteration would need to build a more automated process for synchronizing DDL changes from Oracle to the Dgraph schema and updating the transformation logic, potentially using a schema registry to enforce compatibility.