The core operational dataset, residing in DynamoDB, was generating write traffic exceeding 10,000 transactions per second during peak hours. Our analytics platform, built on ClickHouse, required this data for complex, near-real-time trend analysis. The existing batch ETL process, running every four hours, created an unacceptable data freshness gap. Off-the-shelf managed streaming services were evaluated, but the combination of high cost at scale and inflexibility in our complex data enrichment logic made them unsuitable. The only viable path was a custom Change Data Capture (CDC) pipeline designed for high throughput, low latency, and operational robustness.
This chronicle details the construction of that pipeline, a system that hinges on a fault-tolerant Quarkus application consuming DynamoDB Streams, orchestrating large-scale data transformation via a Dask cluster, and achieving sub-minute data visibility in ClickHouse.
The Architectural Pain Point: Choosing the Right Stream Consumer
DynamoDB Streams provides a time-ordered sequence of item-level modifications, a perfect source for CDC. The initial temptation was to use AWS Lambda. However, in a real-world project, this approach presents significant challenges. A single shard’s batch could contain up to 10MB of data, and our enrichment logic required joins against multi-gigabyte datasets. This would either exceed Lambda’s execution time limits or require complex, stateful invocation chains that are difficult to manage and debug.
We needed a long-running, stateful, and resource-efficient consumer. This led to Quarkus. Its minimal memory footprint and fast startup time are ideal for a containerized service that is both durable and cheap to run. More importantly, it provides access to the mature Java ecosystem for concurrency management and the robust AWS SDK v2, which is critical for handling the intricacies of DynamoDB Streams shard management.
The initial design settled on a Quarkus application responsible for three critical tasks:
- Discovering and leasing DynamoDB stream shards.
- Reliably polling each shard for records.
- Checkpointing progress to prevent data loss or reprocessing during failures.
The following is the core dependency setup in the pom.xml
. The AWS SDK v2 is used exclusively for its superior features, including non-blocking I/O clients, which are a better fit for high-performance applications.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus.platform</groupId>
<artifactId>quarkus-bom</artifactId>
<version>3.4.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<!-- AWS SDK v2 for DynamoDB and Streams -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-amazon-dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodbstreams</artifactId>
<version>2.20.158</version> <!-- Match AWS SDK version -->
</dependency>
<!-- For JSON processing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<!-- Logging -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-logging-json</artifactId>
</dependency>
<!-- For unit testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
The application’s entry point is a scheduler that periodically scans for available stream shards and assigns them to worker threads. A common mistake is to process all shards on a single thread, creating a bottleneck. Our approach uses a ConcurrentHashMap
to track active ShardProcessor
tasks, ensuring that new or resharded shards are picked up automatically.
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodbstreams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodbstreams.model.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ApplicationScoped
public class StreamManager {
private static final Logger LOG = Logger.getLogger(StreamManager.class);
@Inject
DynamoDbStreamsClient streamsClient;
@Inject
ShardProcessorFactory shardProcessorFactory;
@ConfigProperty(name = "dynamodb.stream.arn")
String streamArn;
private final ExecutorService executorService = Executors.newCachedThreadPool();
private final Map<String, ShardProcessor> activeProcessors = new ConcurrentHashMap<>();
void onStart(@Observes StartupEvent ev) {
LOG.info("Starting DynamoDB Stream Manager for ARN: " + streamArn);
// Initial shard discovery on startup
discoverAndProcessShards();
}
// This method will be triggered by a Quarkus scheduler, e.g., every 30 seconds.
// @Scheduled(every = "30s")
public void discoverAndProcessShards() {
LOG.info("Starting shard discovery cycle...");
try {
String lastEvaluatedShardId = null;
do {
DescribeStreamRequest request = DescribeStreamRequest.builder()
.streamArn(streamArn)
.exclusiveStartShardId(lastEvaluatedShardId)
.limit(100)
.build();
DescribeStreamResponse response = streamsClient.describeStream(request);
StreamDescription description = response.streamDescription();
List<Shard> shards = description.shards();
for (Shard shard : shards) {
// A key production detail: only start a processor if one isn't already running for this shard.
activeProcessors.computeIfAbsent(shard.shardId(), shardId -> {
LOG.infof("New shard detected: %s. Starting processor.", shardId);
ShardProcessor processor = shardProcessorFactory.create(shard);
executorService.submit(processor);
return processor;
});
}
lastEvaluatedShardId = description.lastEvaluatedShardId();
} while (lastEvaluatedShardId != null);
// Prune processors for completed shards
pruneCompletedShards();
} catch (Exception e) {
LOG.error("Failed during shard discovery.", e);
}
}
private void pruneCompletedShards() {
activeProcessors.entrySet().removeIf(entry -> {
if (entry.getValue().isCompleted()) {
LOG.infof("Shard processor for %s has completed. Removing from active list.", entry.getKey());
return true;
}
return false;
});
}
}
The real work happens inside the ShardProcessor
. It must handle the entire lifecycle of a single shard, including obtaining an iterator, polling for records, and, most importantly, robustly handling exceptions and checkpointing. Storing the last processed sequence number in-memory is a critical anti-pattern; a service restart would lead to reprocessing or data loss. We implemented checkpointing in a separate DynamoDB table.
public class ShardProcessor implements Runnable {
private static final Logger LOG = Logger.getLogger(ShardProcessor.class);
private final Shard shard;
private final DynamoDbStreamsClient streamsClient;
private final CheckpointManager checkpointManager;
private final RecordBatchHandler batchHandler;
private String currentShardIterator;
private volatile boolean completed = false;
// Injected via a factory to allow for proper dependency management
public ShardProcessor(Shard shard, DynamoDbStreamsClient streamsClient, CheckpointManager checkpointManager, RecordBatchHandler batchHandler) {
this.shard = shard;
this.streamsClient = streamsClient;
this.checkpointManager = checkpointManager;
this.batchHandler = batchHandler;
}
@Override
public void run() {
try {
initialize();
processRecords();
} catch (Exception e) {
LOG.errorf(e, "Unrecoverable error in shard processor for shard %s. Thread will terminate.", shard.shardId());
} finally {
LOG.infof("Shard processor for %s is shutting down.", shard.shardId());
}
}
private void initialize() {
String lastProcessedSequenceNumber = checkpointManager.getLastSequenceNumber(shard.shardId());
ShardIteratorType iteratorType;
if (lastProcessedSequenceNumber == null) {
iteratorType = ShardIteratorType.TRIM_HORIZON;
LOG.infof("No checkpoint for shard %s. Starting from TRIM_HORIZON.", shard.shardId());
} else {
iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
LOG.infof("Found checkpoint for shard %s. Starting after sequence number: %s", shard.shardId(), lastProcessedSequenceNumber);
}
GetShardIteratorRequest request = GetShardIteratorRequest.builder()
.streamArn(streamArn) // Assume streamArn is available
.shardId(shard.shardId())
.shardIteratorType(iteratorType)
.sequenceNumber(lastProcessedSequenceNumber)
.build();
this.currentShardIterator = streamsClient.getShardIterator(request).shardIterator();
}
private void processRecords() {
while (currentShardIterator != null) {
try {
GetRecordsRequest request = GetRecordsRequest.builder()
.shardIterator(currentShardIterator)
.limit(1000) // Max batch size
.build();
GetRecordsResponse response = streamsClient.getRecords(request);
List<Record> records = response.records();
if (!records.isEmpty()) {
// The handler is responsible for transformation and handoff to Dask
batchHandler.process(records);
// Critical: Checkpoint only after successful processing of the batch
String lastSequenceNumber = records.get(records.size() - 1).dynamodb().sequenceNumber();
checkpointManager.checkpoint(shard.shardId(), lastSequenceNumber);
}
currentShardIterator = response.nextShardIterator();
if (currentShardIterator == null) {
LOG.infof("Shard %s has been closed. Ending processing.", shard.shardId());
this.completed = true;
}
// Avoid busy-looping on an empty shard
if (records.isEmpty() && currentShardIterator != null) {
Thread.sleep(1000);
}
} catch (TrimmedDataAccessException e) {
LOG.warnf("Shard iterator for %s is trimmed. Must re-acquire from TRIM_HORIZON.", shard.shardId());
// Reset to the beginning of the shard. This indicates we fell too far behind.
checkpointManager.clearCheckpoint(shard.shardId());
initialize();
} catch (ExpiredIteratorException e) {
LOG.infof("Shard iterator expired. Re-acquiring for shard %s.", shard.shardId());
initialize();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warnf("Shard processor for %s was interrupted.", shard.shardId());
break;
} catch (Exception e) {
LOG.errorf(e, "Error processing records for shard %s. Retrying in 10s.", shard.shardId());
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
public boolean isCompleted() {
return this.completed;
}
}
The Handoff: From Quarkus to Dask
Directly calling a Python Dask process from Java via RPC is brittle and introduces tight coupling. A more resilient architectural pattern involves an intermediate storage layer. Our RecordBatchHandler
was designed to batch records, convert them into a Dask-friendly format (gzipped JSON Lines), and upload them to an S3 bucket. This decouples the systems; the Quarkus consumer’s only job is to reliably dump data, while the Dask cluster’s job is to watch for and process new files.
graph TD A[DynamoDB Table] -- Writes --> B(DynamoDB Stream); B -- Shard 1..N --> C{Quarkus Consumer}; C -- 1. Poll Records --> C; C -- 2. Batch & Serialize (JSONL) --> D[Batch File]; D -- 3. Upload --> E[S3 Bucket: landing-zone/]; C -- 4. Send Message --> F[SQS Queue: new-file-topic]; F -- Triggers --> G((Dask Scheduler)); G -- Schedules Task --> H[Dask Workers]; H -- Read File --> E; H -- Process & Enrich --> H; H -- Bulk Insert --> I[ClickHouse];
The trigger mechanism from Quarkus to Dask was an SQS message containing the S3 object key. A small Python daemon listening to this SQS queue was responsible for submitting the processing job to the Dask scheduler. This provides a durable, asynchronous handoff.
The implementation of the batch handler in Quarkus:
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
// ... Inside the RecordBatchHandler implementation
public void process(List<Record> records) {
if (records.isEmpty()) {
return;
}
String batchId = UUID.randomUUID().toString();
String s3Key = "landing-zone/" + LocalDate.now() + "/" + batchId + ".jsonl.gz";
try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream);
PrintWriter writer = new PrintWriter(new OutputStreamWriter(gzipStream, StandardCharsets.UTF_8))) {
for (Record record : records) {
// Convert DynamoDB AttributeValue map to a standard JSON object
Map<String, Object> simplifiedRecord = simplifyDynamoDbRecord(record);
writer.println(objectMapper.writeValueAsString(simplifiedRecord));
}
writer.flush();
gzipStream.finish();
// Upload to S3
s3Client.putObject(PutObjectRequest.builder()
.bucket(s3BucketName)
.key(s3Key)
.build(),
RequestBody.fromBytes(byteStream.toByteArray()));
// Trigger Dask job via SQS
String messageBody = "{\"s3Bucket\": \"" + s3BucketName + "\", \"s3Key\": \"" + s3Key + "\"}";
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(sqsQueueUrl)
.messageBody(messageBody)
.build());
LOG.infof("Successfully processed batch of %d records. Uploaded to s3://%s/%s", records.size(), s3BucketName, s3Key);
} catch (IOException e) {
LOG.errorf(e, "Failed to serialize and upload batch %s", batchId);
// This is a critical failure. The exception should propagate to force a retry of the GetRecords call.
throw new RuntimeException("Failed to process record batch", e);
}
}
The Heavy Lifting: Parallel Transformation with Dask
With data reliably landing in S3, Dask takes over. Dask excels at parallelizing operations on larger-than-memory datasets. Its lazy evaluation allows for building a complex computation graph that is then optimized and executed across a cluster of workers.
A key pitfall here is inefficiently reading the DynamoDB JSON. The raw stream record contains type information (e.g., {"my_string": {"S": "value"}}
). The Quarkus service already flattens this structure, but the Dask job must be written to handle the clean JSON.
The Dask script is structured to perform three main steps:
- Read the raw JSONL data from S3 into a Dask DataFrame.
- Apply transformations: type casting, data cleaning, and enrichment by joining with a separate, large dimension table (also loaded from S3).
- Write the transformed data to ClickHouse in efficient batches.
import dask.dataframe as dd
import dask.bag as db
from dask.distributed import Client
import json
import pandas as pd
from clickhouse_driver import Client as ClickHouseClient
import os
# Configuration from environment variables
DASK_SCHEDULER = os.environ.get("DASK_SCHEDULER_ADDRESS")
CLICKHOUSE_HOST = os.environ.get("CLICKHOUSE_HOST")
S3_BUCKET = os.environ.get("S3_LANDING_BUCKET")
CLICKHOUSE_DATABASE = "analytics"
CLICKHOUSE_TABLE = "events_raw"
def process_s3_file(s3_key):
"""
Main function for the Dask job.
"""
client = Client(DASK_SCHEDULER)
# 1. Read raw data from S3
# Use Dask Bag for flexible JSON parsing before converting to DataFrame
s3_path = f"s3://{S3_BUCKET}/{s3_key}"
records_bag = db.read_text(s3_path, storage_options={'anon': False}).map(json.loads)
# A common mistake is not specifying the metadata (dtypes). Dask needs this to build its graph.
meta = pd.DataFrame({
'event_id': pd.Series(dtype='str'),
'user_id': pd.Series(dtype='int'),
'timestamp': pd.Series(dtype='datetime64[ns]'),
'event_type': pd.Series(dtype='str'),
'payload': pd.Series(dtype='object')
})
df = records_bag.to_dataframe(meta=meta)
# Ensure timestamp is correctly parsed and set as index for potential time-series operations
df['timestamp'] = dd.to_datetime(df['timestamp'])
# 2. Enrich data
# Example: join with a large user dimension table from Parquet on S3
user_dims = dd.read_parquet('s3://my-data-lake/user_dimensions/')
enriched_df = df.merge(user_dims, on='user_id', how='left')
# 3. Write to ClickHouse
# Using map_partitions is the key to scalable writes. Each worker writes its own partition.
enriched_df.map_partitions(
write_to_clickhouse,
CLICKHOUSE_HOST,
CLICKHOUSE_DATABASE,
CLICKHOUSE_TABLE,
meta=pd.DataFrame({'write_status': pd.Series(dtype='bool')})
).compute()
print(f"Successfully processed and ingested {s3_key}")
def write_to_clickhouse(df_partition, host, database, table):
"""
This function is executed by each Dask worker on its partition of the DataFrame.
"""
if df_partition.empty:
return pd.DataFrame({'write_status': [True]})
try:
# Each worker creates its own connection. Do not share connections across processes.
ch_client = ClickHouseClient(host=host, database=database)
# Prepare data for insertion. clickhouse-driver can take a list of dicts.
data_to_insert = df_partition.to_dict('records')
# This is a bulk insert operation, which is orders of magnitude faster than row-by-row.
ch_client.execute(f'INSERT INTO {table} VALUES', data_to_insert)
return pd.DataFrame({'write_status': [True] * len(df_partition)})
except Exception as e:
# In a real-world project, this should push to a dead-letter queue.
print(f"Error writing partition to ClickHouse: {e}")
return pd.DataFrame({'write_status': [False] * len(df_partition)})
if __name__ == "__main__":
# This script would be triggered by the SQS listener daemon
# which would pass the s3_key as a command-line argument.
import sys
if len(sys.argv) > 1:
process_s3_file(sys.argv[1])
else:
print("Usage: python process_job.py <s3_key>")
The performance of the write_to_clickhouse
function is critical. Using the native bulk insert functionality of the client library is non-negotiable. Sending data in formats like TabSeparated
or Parquet
can be even faster, but requires more complex data serialization on the Dask worker side. For our use case, sending records as a list of dicts provided a good balance of performance and implementation simplicity.
This architecture achieved an end-to-end latency (P99) of under 60 seconds from a record being written to DynamoDB to it being queryable in ClickHouse. The Quarkus consumer ran stably within a 256MB memory container, and the Dask cluster could be scaled horizontally to handle any spikes in stream traffic by simply adding more worker nodes.
The decoupling via S3 and SQS proved its worth during production incidents. A temporary slowdown in the Dask cluster or ClickHouse did not impact the Quarkus consumer, which continued to drain the DynamoDB stream and buffer data in S3. Once the downstream systems recovered, the backlog of files was processed automatically without manual intervention.
The S3-based handoff, while robust, does introduce several seconds of latency due to file creation, upload, and notification. A future optimization could involve replacing S3/SQS with a direct streaming RPC framework like Apache Arrow Flight between the Quarkus service and Dask workers. This would create a tighter coupling but could shave critical seconds off the pipeline latency. Furthermore, the current implementation relies on manual schema synchronization between the source, the Dask job, and ClickHouse. Integrating a schema registry would be a significant step towards making the pipeline more resilient to upstream data model changes.