The operational pain point was clear: our primary MySQL database, backing a critical monolithic application, was buckling under the strain of analytical queries. The business intelligence team needed fresher data, but their ad-hoc queries were causing lock contentions and performance degradation on the OLTP workload. Our nightly ETL batch jobs, which dumped data into our Hadoop cluster, were taking hours to run, meaning reports were always a day old. This latency was no longer acceptable. The directive was to establish a near real-time data flow into HDFS without destabilizing the source database or requiring invasive application-level changes.
My initial proposal revolved around Change Data Capture (CDC). Instead of querying the database directly, we could tap into MySQL’s binary log (binlog), a low-level, ordered stream of all data modifications. This approach is non-intrusive and captures every INSERT
, UPDATE
, and DELETE
at the source. For the CDC tool, Debezium stood out. It’s designed as a Kafka Connect source connector, which provides a robust, distributed, and fault-tolerant framework out of the box. Using Apache Kafka as an intermediary buffer between MySQL and Hadoop was a non-negotiable architectural choice. It decouples the source from the destination, provides data durability, and allows multiple downstream consumers to subscribe to the change stream independently. It also naturally handles backpressure; if the Hadoop ingestion process slows down, Kafka retains the data until it can be processed.
To manage this multi-component stack (MySQL, Zookeeper, Kafka, Kafka Connect), containerization was the only sane path forward. A docker-compose
setup would allow us to define the entire pipeline declaratively, ensuring environment consistency from local development to staging and production. It simplifies dependency management and makes the entire system portable and reproducible.
Phase 1: The Dockerized Foundation
The first step was to build a stable, reproducible local environment that mirrored our production components. A real-world project requires precise configuration, especially for networking and data persistence within Docker.
Here is the docker-compose.yml
that defines our entire pipeline. Note the specific configurations: MySQL is set up with binlog enabled in ROW
format, which is essential for Debezium. Kafka Connect depends on both Kafka and the database being ready. Volumes are used to persist data across container restarts, which is critical for Kafka logs and MySQL data.
# docker-compose.yml
# A production-grade foundation for our CDC pipeline.
version: '3.8'
services:
mysql:
image: mysql:8.0
container_name: cdc_mysql_source
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=mysecretpassword
- MYSQL_DATABASE=inventory
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
- mysql_data:/var/lib/mysql
- ./mysql-init:/docker-entrypoint-initdb.d
command:
- '--server-id=101'
- '--binlog_format=ROW'
- '--log-bin=mysql-bin'
- '--gtid-mode=ON'
- '--enforce-gtid-consistency=ON'
- '--log-slave-updates=ON'
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-pmysecretpassword"]
interval: 10s
timeout: 5s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: cdc_zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.2
container_name: cdc_kafka
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
connect:
image: confluentinc/cp-kafka-connect:7.3.2
container_name: cdc_connect
ports:
- "8083:8083"
depends_on:
mysql:
condition: service_healthy
kafka:
condition: service_started
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
# We mount the Debezium connector JARs into the plugin path.
# In a real setup, you'd build a custom Docker image.
volumes:
- ./debezium-connector-mysql:/usr/share/confluent-hub-components/debezium-connector-mysql
volumes:
mysql_data:
To bootstrap the database, a simple schema is loaded on startup.
-- mysql-init/01-schema.sql
CREATE DATABASE inventory;
USE inventory;
CREATE TABLE products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
stock INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE sales (
order_id VARCHAR(36) PRIMARY KEY,
product_id INT,
quantity INT,
sale_price DECIMAL(10, 2),
order_date DATETIME,
FOREIGN KEY (product_id) REFERENCES products(id)
);
Phase 2: Configuring the Debezium Connector
With the infrastructure running via docker-compose up -d
, the next step is to configure and deploy the Debezium connector. This is done by sending a JSON payload to the Kafka Connect REST API. A common mistake is to misconfigure the connector, leading to silent failures or incomplete data capture. Every parameter here is critical.
The database.history.kafka.topic
is particularly important; Debezium uses this internal topic to track the history of DDL changes, making it resilient to schema evolution. The server.id
must be unique across your entire MySQL replication topology.
// debezium-mysql-source-config.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "mysecretpassword",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.products,inventory.sales",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"include.schema.changes": "true",
"snapshot.mode": "initial"
}
}
To deploy this, we use a simple curl
command targeting the Connect container.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
--data @debezium-mysql-source-config.json
Phase 3: Observing the Change Stream
After deploying the connector, Debezium performs an initial consistent snapshot of the included tables and then switches to streaming changes from the binlog. To verify this, we can use a command-line tool like kafkacat
(or kcat
) to inspect the Kafka topics. For the products
table, the topic will be named dbserver1.inventory.products
by default.
Let’s insert some data into MySQL:
INSERT INTO products (name, description, stock) VALUES
('Laptop', '15-inch, 16GB RAM, 512GB SSD', 50),
('Mouse', 'Wireless ergonomic mouse', 200);
Now, tail the topic from within another container or from the host if kafkacat
is installed:
docker exec -it cdc_kafka /usr/bin/kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic dbserver1.inventory.products \
--from-beginning
The output for the first insert will be a rich JSON document. The critical parts are the before
and after
fields, which represent the state of the row before and after the change. For an INSERT
, before
is null
.
{
"before": null,
"after": {
"id": 1,
"name": "Laptop",
"description": "15-inch, 16GB RAM, 512GB SSD",
"stock": 50,
"created_at": 1667502534000,
"updated_at": 1667502534000
},
"source": { ... },
"op": "c", // 'c' for create/insert
"ts_ms": 1667502534512,
"transaction": null
}
This confirms our pipeline’s source side is working correctly. Changes in MySQL are being captured and published to Kafka in near real-time.
Phase 4: The Ingestion Pitfall - The Small Files Problem
The next logical step was to get this data into HDFS. The Kafka Connect framework has a plethora of sink connectors, including several for HDFS. My first attempt involved using a standard, off-the-shelf HDFS sink connector. The configuration was straightforward.
However, after running this for a few hours with moderate traffic, a classic big data problem emerged: the HDFS “small files problem.” The connector was flushing data to HDFS frequently to achieve low latency, resulting in thousands of tiny JSON files. HDFS is optimized for large files; the NameNode’s memory is consumed by file metadata, and MapReduce or Spark jobs run inefficiently when processing a massive number of small files. Furthermore, querying raw JSON with Hive or Presto is slow. We needed a columnar format like Parquet.
This is a critical lesson in production data engineering: a solution that works functionally can be an operational disaster at scale. The generic HDFS sink was not the right tool for this job.
graph TD subgraph "Attempt 1: The Pitfall" A[MySQL Binlog] --> B{Debezium}; B --> C[Kafka Topic]; C --> D{Generic HDFS Sink}; D --> E[HDFS: Many small JSON files]; style E fill:#f96,stroke:#333,stroke-width:2px; end
Phase 5: Production-Grade Ingestion with a Custom Consumer
We needed more control over the ingestion process. The solution was to write a dedicated consumer service that pulls messages from Kafka, buffers them in memory, converts them to the Parquet format, and writes them to HDFS in larger, properly sized files (e.g., 128MB). This approach allows us to control file rotation policies (based on size, time, or number of records) and handle data partitioning correctly (e.g., by date).
Here is a conceptual Python implementation using kafka-python
and pyarrow
. In a production system, this would be a robust, long-running service with proper logging, metrics, and error handling (e.g., a dead-letter queue for unparseable messages).
# hdfs_ingestion_service.py
# A robust consumer for writing batched Parquet files to HDFS.
import json
import logging
from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
from pyarrow import Table, schema, string, int32, timestamp
import pyarrow.parquet as pq
import pyarrow.fs as fs
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Constants
KAFKA_BROKER = 'localhost:29092'
KAFKA_TOPIC = 'dbserver1.inventory.products'
HDFS_HOST = 'localhost' # Assuming a local HDFS for this example
HDFS_PORT = 9000
BATCH_SIZE = 1000 # Number of messages to buffer before writing
BATCH_TIMEOUT_MS = 60000 # Max time to wait for a full batch
# Define the Parquet schema. This is crucial for performance and compatibility.
# In a real system, this would be derived from a schema registry.
PYARROW_SCHEMA = schema([
('id', int32()),
('name', string()),
('description', string()),
('stock', int32()),
('updated_at', timestamp('ms'))
])
class HdfsIngestionService:
def __init__(self):
try:
self.consumer = KafkaConsumer(
bootstrap_servers=[KAFKA_BROKER],
auto_offset_reset='earliest',
enable_auto_commit=False, # Manual offset control is critical
group_id='hdfs-ingestion-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
self.hdfs = fs.HadoopFileSystem(HDFS_HOST, HDFS_PORT)
logging.info("Successfully connected to Kafka and HDFS.")
except Exception as e:
logging.error(f"Failed to initialize services: {e}")
raise
def process_messages(self):
self.consumer.subscribe([KAFKA_TOPIC])
logging.info(f"Subscribed to topic: {KAFKA_TOPIC}")
records_batch = []
last_commit_time = datetime.now()
try:
while True:
# Poll for messages with a timeout
messages = self.consumer.poll(timeout_ms=BATCH_TIMEOUT_MS, max_records=BATCH_SIZE)
if not messages:
# If timeout is reached and we have data, write it
if records_batch:
logging.info("Poll timeout reached. Writing existing batch.")
self.write_batch_to_hdfs(records_batch)
self.consumer.commit()
records_batch.clear()
continue
for tp, msgs in messages.items():
for msg in msgs:
# Extract the 'after' state from the Debezium message
payload = msg.value.get('payload', {})
if payload.get('op') in ['c', 'u', 'r']: # create, update, read (snapshot)
after_state = payload.get('after')
if after_state:
# Data transformation and cleaning can happen here
record = {
'id': after_state.get('id'),
'name': after_state.get('name'),
'description': after_state.get('description'),
'stock': after_state.get('stock'),
# Debezium timestamp is in microseconds for some versions, convert to ms
'updated_at': after_state.get('updated_at')
}
records_batch.append(record)
if len(records_batch) >= BATCH_SIZE:
logging.info(f"Batch size reached ({len(records_batch)} records). Writing to HDFS.")
self.write_batch_to_hdfs(records_batch)
self.consumer.commit() # Commit offsets only after a successful write
records_batch.clear()
except KeyboardInterrupt:
logging.info("Shutdown signal received.")
finally:
if records_batch:
logging.info("Writing final batch before shutdown.")
self.write_batch_to_hdfs(records_batch)
self.consumer.commit()
self.consumer.close()
logging.info("Kafka consumer closed.")
def write_batch_to_hdfs(self, batch):
if not batch:
return
try:
# Create a PyArrow Table from the batch of records
table = Table.from_pylist(batch, schema=PYARROW_SCHEMA)
# Generate a unique filename and path with date-based partitioning
now = datetime.now()
partition_path = f"/data/inventory/products/year={now.year}/month={now.month:02d}/day={now.day:02d}"
file_path = f"{partition_path}/data_{now.strftime('%Y%m%d%H%M%S%f')}.parquet"
logging.info(f"Writing {len(batch)} records to {file_path}")
# Write the Parquet file to HDFS
pq.write_to_dataset(
table,
root_path="/data/inventory/products",
partition_cols=['year', 'month', 'day'],
filesystem=self.hdfs,
existing_data_behavior='overwrite_or_ignore'
)
# A more direct approach without dataset API for single file write:
# with self.hdfs.open_output_stream(file_path) as f:
# pq.write_table(table, f)
logging.info("Write to HDFS successful.")
except Exception as e:
# In production, implement a retry mechanism or DLQ
logging.error(f"Failed to write batch to HDFS: {e}")
# Do NOT commit offsets here, so we can retry processing this batch.
if __name__ == '__main__':
service = HdfsIngestionService()
service.process_messages()
This custom consumer gives us full control, solving the small files problem and ensuring the data is stored in an analytics-friendly format. The manual offset management (enable_auto_commit=False
) is crucial for guaranteeing at-least-once processing semantics. We only commit the Kafka offsets after a batch has been successfully written to HDFS.
graph TD subgraph "Final Architecture" A[MySQL Binlog] --> B{Debezium in Kafka Connect}; B --> C[Kafka Topic]; C --> F{Custom Ingestion Service}; F --> G[HDFS: Large, Partitioned Parquet files]; subgraph Hive/Presto/Spark H[Analytical Queries] end G --> H; style G fill:#9f9,stroke:#333,stroke-width:2px; end
Phase 6: Handling Production Realities - Schema Evolution
The system ran smoothly for weeks until a developer added a weight_kg DECIMAL(10, 3)
column to the products
table. The pipeline didn’t crash immediately, but downstream analytics jobs started failing. The Python consumer, with its hardcoded schema, couldn’t process the new messages containing the weight_kg
field, causing it to log errors and fall behind.
This is where a Schema Registry becomes essential. By integrating a tool like Confluent Schema Registry and switching our Kafka converters to Avro (io.confluent.connect.avro.AvroConverter
), Debezium will automatically register and version schemas. Our consumer can then fetch the correct schema for each message, allowing it to adapt dynamically to DDL changes like adding new columns. This adds complexity to the setup but is a requirement for any long-lived, production CDC pipeline. The ingestion code would need to be modified to query the Schema Registry to decode messages and dynamically adjust the Parquet schema.
The final architecture is robust, scalable, and addresses the initial business problem. It provides a near real-time stream of data to our Hadoop ecosystem without impacting the source database, and it’s built to handle the realities of a production environment like schema changes and backpressure.
This implementation, while functional, still has limitations. The custom consumer, as a single Python process, is a single point of failure and a potential bottleneck. In a true production environment, this service would need to be containerized and deployed on an orchestrator like Kubernetes with multiple replicas running as part of the same consumer group to parallelize processing. Furthermore, the initial snapshot process for a very large table can still place a significant read load on the source MySQL database; this needs to be scheduled during off-peak hours and monitored closely. Finally, handling destructive schema changes, such as deleting or renaming a column, requires a more sophisticated evolution strategy and potential manual intervention to avoid breaking downstream consumers that depend on the old schema.