The initial architecture was deceptively simple: a Koa service endpoint received event data and performed a direct INSERT
into a ScyllaDB cluster. This worked under light load but failed catastrophically during traffic spikes. API requests would time out as the service waited for slow database writes, leading to dropped data and a cascading failure that propagated back to our clients. The database, designed for high throughput, became a bottleneck not because of its inherent limits, but because our application architecture lacked the necessary decoupling to handle contention and backpressure.
The first logical step was introducing a message queue to act as a buffer. We chose Kafka for its durability and high-throughput capabilities. The Koa service became a lightweight producer, its sole responsibility being to validate incoming data and publish it to a Kafka topic. This immediately solved the API timeout issue. Response times became fast and predictable. However, the problem didn’t disappear; it simply moved. Now, a new consumer service, responsible for pulling data from Kafka and writing to ScyllaDB, was the component under pressure. During database slowdowns, this consumer would either crash or fall so far behind that the Kafka topic’s retention policy would begin purging unprocessed data. Worse, during restarts or transient network errors, we discovered duplicate records in our database, corrupting our datasets. The challenge evolved from simple ingestion to building a truly resilient and idempotent pipeline.
This document details the build process, pitfalls, and final implementation of this pipeline, focusing on two critical aspects often overlooked in simple tutorials: write idempotency and consumer-side backpressure management.
The Foundational Components: Why Koa, Kafka, and ScyllaDB?
In a real-world project, technology selection is a series of trade-offs.
- Koa: For the API ingress, Node.js with Koa was a pragmatic choice. The task is I/O-bound (receiving an HTTP request, serializing data, sending to Kafka). Koa’s middleware-based architecture and native
async/await
support make for clean, efficient code for this purpose. It’s lightweight and has a minimal core, preventing unnecessary bloat. - Kafka: We selected Kafka over other message brokers like RabbitMQ primarily for its log-based persistence and partitioning model. The ability to retain messages for an extended period is crucial for recovery scenarios. Furthermore, Kafka’s partitioning allows us to scale consumers horizontally and can be aligned with ScyllaDB’s sharding for optimized write patterns, though we won’t delve into partition-aware routing here.
- ScyllaDB: Chosen for its extreme write performance and Cassandra Query Language (CQL) compatibility. Its shard-per-core architecture minimizes internal contention, making it ideal as a sink for high-volume event streams. The availability of a mature Node.js driver for Cassandra was a key factor for our ecosystem.
Here is the environmental setup we’ll use for development, managed via docker-compose
. This ensures a repeatable environment for testing the entire pipeline.
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
scylladb:
image: scylladb/scylla:5.2.0
container_name: scylladb
ports:
- "9042:9042"
command: --smp 1 --memory 1G --developer-mode 1
volumes:
- scylla_data:/var/lib/scylla
volumes:
scylla_data:
Phase 1: The Producer and the First Pitfall - At-Least-Once Delivery
The Koa producer is straightforward. Its job is to accept an HTTP POST request, add a unique identifier for idempotency tracking, and send it to Kafka. A common mistake is to neglect producer configurations, leading to data loss on broker failure.
// producer/src/server.js
const Koa = require('koa');
const Router = require('@koa/router');
const bodyParser = require('koa-bodyparser');
const { Kafka, CompressionTypes } = require('kafkajs');
const { v4: uuidv4 } = require('uuid');
const pino = require('pino');
const logger = pino({ level: 'info' });
const app = new Koa();
const router = new Router();
// Kafka client configuration
const kafka = new Kafka({
clientId: 'event-producer',
brokers: ['localhost:9092'],
});
const producer = kafka.producer({
// Ensure all in-sync replicas have acknowledged the write.
// This is the highest guarantee of durability.
acks: 'all',
// Retry on transient network errors.
retry: {
retries: 5,
initialRetryTime: 300,
}
});
const TOPIC_NAME = 'user-events';
// Middleware setup
app.use(bodyParser());
app.use(router.routes()).use(router.allowedMethods());
// The core ingestion endpoint
router.post('/events', async (ctx) => {
const eventPayload = ctx.request.body;
if (!eventPayload || typeof eventPayload.userId !== 'string' || typeof eventPayload.eventType !== 'string') {
ctx.status = 400;
ctx.body = { error: 'Invalid event payload' };
return;
}
// Critical step: Attach a unique idempotency key to every event *at the source*.
// This key will be used by the consumer to prevent duplicate processing.
const eventId = uuidv4();
const message = {
...eventPayload,
eventId, // Our idempotency key
timestamp: new Date().toISOString(),
};
try {
await producer.send({
topic: TOPIC_NAME,
compression: CompressionTypes.GZIP,
messages: [
{ key: eventPayload.userId, value: JSON.stringify(message) },
],
});
logger.info({ eventId }, 'Successfully produced event');
ctx.status = 202; // Accepted for processing
ctx.body = { status: 'queued', eventId };
} catch (error) {
logger.error({ err: error, eventId }, 'Failed to produce event to Kafka');
ctx.status = 503; // Service Unavailable
ctx.body = { error: 'Failed to queue event for processing' };
}
});
const start = async () => {
try {
await producer.connect();
logger.info('Kafka producer connected');
app.listen(3000, () => {
logger.info('Producer service listening on port 3000');
});
} catch (error) {
logger.fatal({ err: error }, 'Failed to start producer service');
process.exit(1);
}
};
start();
process.on('SIGTERM', async () => {
logger.info('SIGTERM received, shutting down gracefully');
await producer.disconnect();
process.exit(0);
});
Setting acks: 'all'
is crucial for durability. It means the request is only considered complete when the leader replica and all its in-sync follower replicas have received the message. This prevents data loss if a broker goes down immediately after acknowledging a write but before replicating it.
Phase 2: The Idempotent Consumer
This is where the real complexity lies. A naive consumer might look like this: read a message, INSERT
into ScyllaDB, and let the Kafka client auto-commit the offset. This design is fragile. If the database write succeeds but the application crashes before the offset is committed, upon restart the consumer will re-process the same message, creating a duplicate record.
To solve this, we must implement idempotency. The eventId
we added in the producer is our key. The strategy is to use a Lightweight Transaction (LWT) in ScyllaDB with IF NOT EXISTS
. This ensures that an INSERT
operation for a given eventId
can only succeed once.
First, the ScyllaDB table schema:
// schema.cql
CREATE KEYSPACE IF NOT EXISTS event_store WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USE event_store;
-- The main table for storing events.
-- The clustering key includes timestamp and eventId to ensure uniqueness
-- and allow time-series queries for a user.
CREATE TABLE IF NOT EXISTS user_events (
user_id text,
event_timestamp timestamp,
event_id uuid,
event_type text,
payload text, // Using text for a generic JSON blob
PRIMARY KEY ((user_id), event_timestamp, event_id)
) WITH CLUSTERING ORDER BY (event_timestamp DESC);
While we could add event_id
to the primary key, a more robust and explicit pattern for idempotency is to use an LWT. This atomically checks for the existence of the record before inserting it.
Here is the initial, more robust consumer implementation. Note the disabled autoCommit
and the use of LWTs.
// consumer/src/scylla.js
const { Client, auth, types } = require('cassandra-driver');
const client = new Client({
contactPoints: ['localhost:9042'],
localDataCenter: 'datacenter1', // Default for ScyllaDB Docker
keyspace: 'event_store',
authProvider: new auth.PlainTextAuthProvider('cassandra', 'cassandra'), // Default credentials
queryOptions: { consistency: types.consistencies.localQuorum }
});
module.exports = client;
// consumer/src/consumer.js
const { Kafka } = require('kafkajs');
const pino = require('pino');
const scyllaClient = require('./scylla');
const logger = pino({ level: 'info' });
const TOPIC_NAME = 'user-events';
const GROUP_ID = 'scylla-writer-group';
const kafka = new Kafka({
clientId: 'event-consumer',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({
groupId: GROUP_ID,
// Disable auto-commit to take full control over offset management.
// This is the cornerstone of reliable processing.
autoCommit: false,
// Increase session timeout to handle longer processing times without triggering rebalances
sessionTimeout: 60000, // 60 seconds
heartbeatInterval: 10000, // 10 seconds
});
const INSERT_QUERY = `
INSERT INTO user_events (user_id, event_timestamp, event_id, event_type, payload)
VALUES (?, ?, ?, ?, ?)
IF NOT EXISTS;
`;
const processMessage = async (message) => {
try {
const event = JSON.parse(message.value.toString());
const { userId, timestamp, eventId, eventType, ...payload } = event;
if (!eventId) {
logger.warn({ offset: message.offset }, 'Message missing eventId, skipping');
return; // Acknowledge and skip poison pills
}
const params = [userId, new Date(timestamp), eventId, eventType, JSON.stringify(payload)];
const result = await scyllaClient.execute(INSERT_QUERY, params, { prepare: true });
// The result of an LWT query contains a '[applied]' column.
// If true, the insert was successful. If false, it means the record already existed.
const wasApplied = result.rows[0]['[applied]'];
if (wasApplied) {
logger.info({ eventId, offset: message.offset }, 'Successfully inserted new event');
} else {
logger.warn({ eventId, offset: message.offset }, 'Duplicate event detected and ignored');
}
} catch (error) {
logger.error({ err: error, offset: message.offset }, 'Error processing message');
// Rethrow to signal failure to the batch processor
throw error;
}
};
const run = async () => {
await scyllaClient.connect();
logger.info('ScyllaDB client connected');
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAME, fromBeginning: true });
logger.info(`Consumer subscribed to topic ${TOPIC_NAME}`);
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (const message of batch.messages) {
if (!isRunning() || isStale()) {
// Stop processing if the consumer is being shut down or has been kicked from the group
return;
}
try {
await processMessage(message);
// Only resolve the offset *after* the message has been successfully processed.
resolveOffset(message.offset);
} catch (e) {
// A real-world project would have more sophisticated error handling,
// like moving the message to a Dead Letter Queue (DLQ) after several retries.
// For now, we log the error and halt, forcing a restart to retry.
logger.fatal({ offset: message.offset }, 'FATAL: Unrecoverable error processing message, consumer will crash.');
// Crashing the process is a valid strategy in containerized environments.
// The orchestrator (e.g., Kubernetes) will restart it, and it will retry the batch.
process.exit(1);
}
await heartbeat();
}
},
});
};
run().catch(e => {
logger.error({ err: e }, 'Consumer service failed');
process.exit(1);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.info('SIGTERM received, shutting down consumer');
await consumer.disconnect();
await scyllaClient.shutdown();
logger.info('Consumer shutdown complete');
process.exit(0);
});
This implementation solves the idempotency problem. However, it introduces a new one. The processing is sequential: eachBatch
iterates through messages one by one. If ScyllaDB latency increases, the entire partition processing slows to a crawl. The consumer can’t keep up with the incoming message rate, and Kafka lag grows unboundedly.
Phase 3: Implementing Backpressure and Concurrency
The final and most critical evolution of the consumer is to process messages concurrently while respecting the database’s limits. We need a mechanism to process a batch of messages in parallel but limit the number of “in-flight” requests to ScyllaDB. This is a form of manual backpressure—a bulkhead pattern.
The strategy is as follows:
- Fetch a batch of messages from Kafka.
- Fire off database writes for all messages in the batch concurrently.
- Use
Promise.allSettled
to wait for all writes to complete. - Analyze the results:
- For successfully processed messages, track the highest offset.
- For failed messages, implement a retry strategy or send them to a Dead-Letter Queue (DLQ).
- After the batch is handled, commit the highest successful consecutive offset to Kafka.
The pitfall here is managing offsets correctly. If message N
and N+2
succeed but N+1
fails, you can only commit up to offset N-1
. Committing N+2
would cause N+1
to be skipped forever. Our implementation will be simpler for clarity: we will retry the entire batch on any failure or halt processing.
graph TD subgraph Kafka Consumer A[Fetch Batch] --> B{Process Batch Concurrently}; B -->|Limit: MAX_IN_FLIGHT| C[Issue N ScyllaDB Writes]; C --> D[Promise.allSettled]; end subgraph ScyllaDB S[ScyllaDB Cluster] end C --> S; S --> D; subgraph Offset Management D --> E{Analyze Results}; E -->|All Succeeded| F[Commit Highest Offset in Batch]; E -->|Any Failed| G[Handle Failure: Retry/DLQ/Crash]; end F --> A; G --> A;
Here’s the advanced consumer code that implements this pattern.
// consumer/src/advanced-consumer.js
const { Kafka, KafkaJSNonRetriableError } = require('kafkajs');
const pino = require('pino');
const scyllaClient = require('./scylla');
const logger = pino({ level: 'info' });
const TOPIC_NAME = 'user-events';
const GROUP_ID = 'scylla-writer-group-advanced';
// This is our bulkhead. We will only allow this many concurrent writes to ScyllaDB.
// This number should be tuned based on database capacity and consumer resources.
const MAX_CONCURRENT_WRITES = 100;
const kafka = new Kafka({
clientId: 'event-consumer-advanced',
brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({
groupId: GROUP_ID,
autoCommit: false,
sessionTimeout: 60000,
heartbeatInterval: 10000,
maxInFlightRequests: 1, // Ensure batches are processed sequentially by the client
});
const INSERT_QUERY = `
INSERT INTO user_events (user_id, event_timestamp, event_id, event_type, payload)
VALUES (?, ?, ?, ?, ?)
IF NOT EXISTS;
`;
const processSingleMessage = async (message) => {
const event = JSON.parse(message.value.toString());
const { userId, timestamp, eventId, eventType, ...payload } = event;
if (!eventId) {
logger.warn({ offset: message.offset }, 'Message missing eventId, skipping');
return { offset: message.offset, status: 'skipped' };
}
const params = [userId, new Date(timestamp), eventId, eventType, JSON.stringify(payload)];
const result = await scyllaClient.execute(INSERT_QUERY, params, { prepare: true });
const wasApplied = result.rows[0]['[applied]'];
if (!wasApplied) {
logger.warn({ eventId, offset: message.offset }, 'Duplicate event detected and ignored');
}
return { offset: message.offset, status: 'fulfilled' };
};
const run = async () => {
await scyllaClient.connect();
logger.info('ScyllaDB client connected');
await consumer.connect();
await consumer.subscribe({ topic: TOPIC_NAME, fromBeginning: true });
logger.info(`Consumer subscribed to topic ${TOPIC_NAME}`);
await consumer.run({
// We disable auto-commit and use eachBatch to get fine-grained control.
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
const processingPromises = batch.messages.map(message => {
return processSingleMessage(message).catch(error => ({
offset: message.offset,
status: 'rejected',
error,
}));
});
const results = await Promise.all(processingPromises);
// A common mistake is to commit the highest offset regardless of gaps.
// This leads to data loss. We must only commit up to the last *consecutive* success.
let highestSuccessfulOffset = -1;
let commitPoint = -1;
// The offsets in a batch are not guaranteed to be consecutive if compaction is on.
// We start from the batch's first offset and check for success.
const firstOffsetInBatch = parseInt(batch.firstOffset, 10);
for (let i = 0; i < results.length; i++) {
// Find the result corresponding to the next expected offset
const currentOffset = firstOffsetInBatch + i;
const resultForCurrentOffset = results.find(r => parseInt(r.offset, 10) === currentOffset);
if (!isRunning() || isStale()) {
logger.warn('Consumer is shutting down or stale, aborting batch processing.');
return;
}
if (resultForCurrentOffset && resultForCurrentOffset.status !== 'rejected') {
highestSuccessfulOffset = currentOffset;
} else {
// Found a failure or a gap. Stop here.
const failedResult = resultForCurrentOffset || { offset: currentOffset, error: new Error('Missing message in batch processing') };
logger.error({
offset: failedResult.offset,
err: failedResult.error
}, 'Failure in batch, will not commit past last success. Crashing for retry.');
// In a production system, this is where you'd trigger a DLQ process.
// Forcing a crash is a simple, effective retry mechanism in orchestrated environments.
process.exit(1);
}
}
if (highestSuccessfulOffset > -1) {
commitPoint = highestSuccessfulOffset + 1;
logger.info(`Batch processed. Committing offset ${commitPoint}`);
await consumer.commitOffsets([{ topic: batch.topic, partition: batch.partition, offset: String(commitPoint) }]);
}
await heartbeat();
},
// This tells kafkajs how many parallel batch handlers to run per partition.
// Since we handle concurrency inside `eachBatch`, we keep this at 1.
concurrently: 1,
});
};
run().catch(e => {
logger.fatal({ err: e }, 'Consumer service failed unexpectedly');
if (e instanceof KafkaJSNonRetriableError) {
// e.g., broker is not available
process.exit(1);
}
});
process.on('SIGTERM', async () => {
logger.info('SIGTERM received, shutting down advanced consumer');
await consumer.disconnect();
await scyllaClient.shutdown();
logger.info('Consumer shutdown complete');
process.exit(0);
});
This final version provides a resilient consumer that can handle high throughput, guarantees idempotency, and protects the downstream database from being overwhelmed. By managing concurrency and offsets manually, we gain precise control over the data flow and failure modes.
The use of LWTs (IF NOT EXISTS
) for idempotency is a correct and safe pattern, but it’s not free. LWTs incur a performance penalty in ScyllaDB (and Cassandra) as they require a consensus round among replicas. For write paths that are extremely performance-sensitive, an alternative might be to use a separate, dedicated system like Redis or another processed_ids
table to track event IDs, offloading that check from the primary data path. However, this introduces its own complexity in terms of atomicity and system management.
Furthermore, the error handling presented here—crashing the process—is a simple and often effective strategy in environments like Kubernetes, which will automatically restart the pod. A more mature implementation would incorporate an exponential backoff retry mechanism for transient database errors and a Dead-Letter Queue (DLQ) for messages that are consistently failing (so-called “poison pills”), preventing them from halting an entire partition’s progress.