The operational event store, a PostgreSQL table holding our core business events as JSONB, was growing at over 500GB per month. What began as an elegant solution for our Event Sourcing implementation was becoming an operational bottleneck. Aggregate hydration times were creeping up, rebuilding read models from scratch was a multi-day affair, and the cost of our high-performance database cluster was escalating uncontrollably. The infinite, immutable log, a theoretical beauty, had become a very real and expensive production problem.
Our initial concept was to implement a tiered storage strategy. “Hot” events, those from the last 60 days required for immediate operational queries and aggregate consistency checks, would remain in PostgreSQL. “Cold” events, everything older, needed to be offloaded to a cheaper, scalable, and analytics-friendly storage system. Our existing Hadoop cluster, with its vast HDFS capacity, was the obvious candidate. This separated the concerns: PostgreSQL for low-latency transactional writes and reads, HDFS for cost-effective long-term storage and large-scale batch processing.
The critical piece was the transport layer. We needed a durable, ordered, and fault-tolerant pipeline to move events from the application’s edge to HDFS. Our choice of message queue was Kafka. Its log-based architecture is a natural fit for event streams, and its partitioning mechanism allows us to maintain order per aggregate ID, a non-negotiable requirement for Event Sourcing. Most importantly, its mature ecosystem, specifically Kafka Connect, promised a path to a solution without writing a custom, stateful consumer from scratch. A custom consumer would mean dealing with offset management, batching, file rotation on HDFS, failure recovery, and idempotency—a significant engineering effort prone to subtle bugs. In a real-world project, leveraging a robust, declarative framework like Kafka Connect over building bespoke plumbing is almost always the correct decision.
The resulting architecture is straightforward but powerful. The application command handlers persist events to PostgreSQL and simultaneously publish them to a Kafka topic. A Kafka Connect cluster, running the HDFS Sink Connector, consumes these events and archives them to HDFS in an analytics-optimized format.
graph TD subgraph Application Service A[Command Handler] -->|1. Persist Event| B(PostgreSQL - Hot Store); A -->|2. Publish Event| C(Kafka Topic: `domain-events`); end subgraph Archival Pipeline C --> D[Kafka Connect Worker]; D -- Consumes --> C; D -- Uses --> E{HDFS 3 Sink Connector}; E -->|Writes Batches| F(HDFS - Cold Store); end subgraph Analytics Platform F --> G[Spark/Hive Jobs]; G --> H(Analytical Dashboards / Rebuilt Read Models); end style B fill:#dae8fc,stroke:#6c8ebf style F fill:#d5e8d4,stroke:#82b366
Setting the Foundation: Kafka and Connect Worker
Before configuring the pipeline itself, the groundwork must be laid. The Kafka topic for our events needs to be created with a sensible partitioning strategy. For Event Sourcing, partitioning by the aggregate root ID is the standard approach to guarantee that all events for a single entity land on the same partition in the correct order.
# Assuming Kafka is installed in /opt/kafka
# Create the topic with 32 partitions and a replication factor of 3 for durability.
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server kafka-broker-1:9092,kafka-broker-2:9092 \
--replication-factor 3 \
--partitions 32 \
--topic domain-events \
--config retention.ms=-1 # Retain data indefinitely in Kafka until it's consumed and archived.
# Describe the topic to verify its creation
/opt/kafka/bin/kafka-topics.sh --describe \
--bootstrap-server kafka-broker-1:9092 \
--topic domain-events
A pitfall here is under-partitioning. If the number of partitions is too low, it becomes the bottleneck for your system’s throughput, regardless of how many consumer instances you run. We chose 32 as a starting point, allowing for significant consumer group scaling.
The Kafka Connect worker itself is a Java process that runs connectors. Its configuration is critical for resilience and integration with the Kafka cluster. We run it in distributed mode, which allows multiple worker nodes to share the load and provides fault tolerance. If one worker fails, its tasks are rebalanced to the remaining active workers.
Here is a production-grade connect-distributed.properties
file:
# connect-distributed.properties
# Connection to the Kafka cluster for the Connect worker itself.
bootstrap.servers=kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
# A unique ID for this Connect cluster. All workers in the cluster must use the same ID.
group.id=hdfs-archival-connect-cluster
# Key and value converters for internal Connect topics. Using JSON is human-readable for debugging.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# Internal topics for storing connector configurations, offsets, and status.
# These must be created with high replication factor for durability.
offset.storage.topic=connect-cluster-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-cluster-configs
config.storage.replication.factor=3
status.storage.topic=connect-cluster-status
status.storage.replication.factor=3
# The REST API for managing connectors. Binds to all interfaces for external access.
rest.advertised.host.name=connect-worker-1.internal
rest.port=8083
# Location of connector plugins (JAR files). This is crucial.
# You must place the HDFS connector JARs in a subdirectory here.
plugin.path=/opt/kafka-connect/plugins
# Security configuration (example for SASL_SSL)
# security.protocol=SASL_SSL
# sasl.mechanism=PLAIN
# sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
# username="connect_user" \
# password="secret_password";
A common mistake is misconfiguring the plugin.path
. Kafka Connect isolates each connector’s dependencies. The Confluent HDFS 3 Connector should be downloaded and extracted into its own directory inside /opt/kafka-connect/plugins
. Without this isolation, you risk classpath conflicts with other connectors or the Connect framework itself.
The Core Logic: Configuring the HDFS Sink Connector
With the worker running, the core of the implementation is a declarative JSON object that we POST
to the Connect REST API. This configuration defines every aspect of the pipeline’s behavior, from data format to HDFS partitioning.
This is not a toy example; it’s a configuration reflecting real-world trade-offs for performance, cost, and operational sanity.
{
"name": "hdfs-sink-domain-events-archiver-01",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "8",
"topics": "domain-events",
"hdfs.url": "hdfs://namenode-1:8020",
"hadoop.conf.dir": "/etc/hadoop/conf",
"hdfs.authentication.kerberos": "true",
"connect.hdfs.principal": "[email protected]",
"connect.hdfs.keytab": "/etc/security/keytabs/connect.user.keytab",
"hdfs.namenode.principal": "nn/[email protected]",
"flush.size": "100000",
"rotate.interval.ms": "3600000",
"retry.backoff.ms": "5000",
"store.url": "hdfs://namenode-1:8020",
"logs.dir": "/kafka-connect/logs",
"topics.dir": "/data/events",
"format.class": "io.confluent.connect.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"partition.duration.ms": "3600000",
"locale": "en_US",
"timezone": "UTC",
"schema.compatibility": "NONE",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq-hdfs-sink-domain-events",
"errors.deadletterqueue.topic.replication.factor": "3"
}
}
Let’s dissect this configuration.
1. Basic Connector Setup:
-
connector.class
: Specifies the entry point for the HDFS 3 sink. -
tasks.max
: This determines the parallelism. A value of8
means Connect will attempt to distribute the 32 partitions of ourdomain-events
topic among 8 tasks. Each task will handle 4 partitions. This is a key tuning parameter for throughput.
2. HDFS Connection and Security:
-
hdfs.url
: The address of the HDFS NameNode. -
hadoop.conf.dir
: Path to the Hadoop configuration files (core-site.xml
,hdfs-site.xml
). The connector needs these to correctly communicate with a secured and highly-available HDFS cluster. - The Kerberos settings (
hdfs.authentication.kerberos
, principals, keytab) are mandatory for any production Hadoop environment. Omitting them is a common source of security vulnerabilities and connection failures.
3. File Commit and Rotation Strategy:
-
flush.size
: The number of records a task will process before committing them to a file in HDFS. We’ve set a high value of100,000
. This creates larger files on HDFS, which is vastly more efficient for Hadoop to process. The “small files problem” is a classic performance killer in HDFS, and this setting directly combats it. -
rotate.interval.ms
: Even if we haven’t received 100,000 records, force a file commit every hour (3,600,000 ms). This ensures data timeliness. The trade-off is between file size and data latency in HDFS. An hour is a reasonable balance for an archival system.
4. Data Format and Partitioning (The Most Critical Section):
-
format.class
: We choseio.confluent.connect.parquet.ParquetFormat
. Storing raw JSON is wasteful. Parquet is a columnar format that offers excellent compression and is the de facto standard for analytical queries in the Hadoop ecosystem. Spark and Hive queries on Parquet are orders of magnitude faster than on JSON. -
partitioner.class
: We use theTimeBasedPartitioner
. This is the secret sauce for making the data in HDFS easily queryable. -
path.format
:'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
. This configures the partitioner to create a directory structure like/data/events/domain-events/year=2023/month=10/day=27/hour=10/
. This format is not arbitrary; it’s the Hive-style partitioning convention. Tools like Spark, Hive, and Presto automatically discover these partitions, allowing for extremely efficient time-based queries (e.g.,SELECT * FROM events WHERE year=2023 AND month=10
). Without this, a query for a specific day would have to scan all the data ever archived. -
partition.duration.ms
: Aligns the partition boundaries with the rotation interval (1 hour).
5. Data Conversion and Schema Handling:
-
value.converter
: We useJsonConverter
withschemas.enable=false
. Our upstream events are plain JSON without an attached schema. This is a pragmatic starting point but has long-term consequences we’ll discuss later. -
schema.compatibility: NONE
: We are explicit that we are not performing any Avro schema compatibility checks at this stage.
6. Production-Grade Error Handling:
-
errors.tolerance: all
: This is a controversial but necessary choice for this use case. If a malformed or “poison pill” message appears on the Kafka topic, the default behavior (none
) would halt the entire connector task. For an archival pipeline, it’s better to log the bad message and continue processing valid ones. Halting the pipeline could lead to a massive backlog in Kafka and potential data loss if Kafka’s retention is exceeded. -
errors.log.enable: true
: Ensures failed messages are logged. -
errors.deadletterqueue.topic.name
: This is the crucial counterpart toerrors.tolerance=all
. Any message that fails processing is rerouted to a Dead Letter Queue (DLQ) Kafka topic. This creates a durable record of failed messages that can be inspected and reprocessed later by a separate process, ensuring no data is silently lost.
Deployment and Verification
To deploy this, we save the JSON configuration to a file (hdfs-sink-config.json
) and POST
it to the running Connect worker’s REST API.
# Deploy the connector
curl -i -X POST -H "Content-Type: application/json" --data @hdfs-sink-config.json http://connect-worker-1:8083/connectors
# Check the status of the connector and its tasks
curl -s http://connect-worker-1:8083/connectors/hdfs-sink-domain-events-archiver-01/status | jq .
# Example output showing the connector and its 8 tasks are running
# {
# "name": "hdfs-sink-domain-events-archiver-01",
# "connector": {
# "state": "RUNNING",
# "worker_id": "connect-worker-1:8083"
# },
# "tasks": [
# { "id": 0, "state": "RUNNING", "worker_id": "connect-worker-2:8083" },
# { "id": 1, "state": "RUNNING", "worker_id": "connect-worker-3:8083" },
# ...
# ]
# }
After letting it run for a while, we can verify the output on HDFS.
# List the directories created by the partitioner
hdfs dfs -ls /data/events/domain-events
# Found 1 items
# drwxr-xr-x - connect_user supergroup 0 2023-10-27 10:00 /data/events/domain-events/year=2023
hdfs dfs -ls /data/events/domain-events/year=2023/month=10/day=27/hour=10
# Found 2 items
# -rw-r--r-- 3 connect_user supergroup 24578123 2023-10-27 10:59 /data/events/domain-events/...+0000.parquet
# -rw-r--r-- 3 connect_user supergroup 23109876 2023-10-27 11:00 /data/events/domain-events/...+0000.parquet
The final verification is to actually read one of these Parquet files. We can use a simple Python script with pyarrow
to inspect the content and confirm the data is being written correctly.
# read_parquet_from_hdfs.py
import pyarrow.parquet as pq
from pyarrow import fs
# Ensure HADOOP_CONF_DIR is set in the environment
# export HADOOP_CONF_DIR=/etc/hadoop/conf
hdfs_client = fs.HadoopFileSystem(host="namenode-1", port=8020, user="your_user", kerb_ticket=None)
# Path to a Parquet file created by the connector
file_path = "/data/events/domain-events/year=2023/month=10/day=27/hour=10/domain-events+0+0000000000.parquet"
table = pq.read_table(file_path, filesystem=hdfs_client)
df = table.to_pandas()
# Print the first 5 records
print(df.head())
# Expected output would be a pandas DataFrame where each row is an event message.
This end-to-end verification confirms our pipeline is operational. The PostgreSQL database’s growth is now contained, and we have a durable, cost-effective, and queryable archive of all historical events. We can now point our Spark jobs at /data/events/domain-events
to rebuild read models or perform complex business intelligence analytics without ever touching the production OLTP database.
This architecture is not without its own complexities and future challenges. The reliance on schemaless JSON in Kafka is a form of technical debt. While easy to start with, it pushes the problem of schema validation and evolution downstream to the consumers (our Spark jobs). A single producer publishing a malformed event with a misspelled field could break analytical jobs hours or days later. The robust, long-term solution is to introduce a schema registry like the Confluent Schema Registry and migrate our producers and the Kafka Connect configuration to use Avro. This enforces schemas on write, making the entire pipeline more resilient.
Furthermore, the HDFS Sink provides at-least-once semantics. In most failure scenarios (like a task restarting), it correctly uses HDFS file visibility and offset tracking to avoid duplicates. However, in more complex failure modes, it is possible to end up with duplicate data in HDFS. For an archival system, this is often an acceptable trade-off, with deduplication handled during batch processing. Achieving exactly-once semantics is significantly more complex and may not be worth the operational overhead. Finally, this pipeline only handles the archival; a separate, carefully tested service is still required to periodically purge the now “cold” data from the primary PostgreSQL store, a process that must be executed with extreme care to avoid impacting live operations.