Constructing a Resilient MySQL to Hadoop CDC Pipeline with Docker and Debezium


The operational pain point was clear: our primary MySQL database, backing a critical monolithic application, was buckling under the strain of analytical queries. The business intelligence team needed fresher data, but their ad-hoc queries were causing lock contentions and performance degradation on the OLTP workload. Our nightly ETL batch jobs, which dumped data into our Hadoop cluster, were taking hours to run, meaning reports were always a day old. This latency was no longer acceptable. The directive was to establish a near real-time data flow into HDFS without destabilizing the source database or requiring invasive application-level changes.

My initial proposal revolved around Change Data Capture (CDC). Instead of querying the database directly, we could tap into MySQL’s binary log (binlog), a low-level, ordered stream of all data modifications. This approach is non-intrusive and captures every INSERT, UPDATE, and DELETE at the source. For the CDC tool, Debezium stood out. It’s designed as a Kafka Connect source connector, which provides a robust, distributed, and fault-tolerant framework out of the box. Using Apache Kafka as an intermediary buffer between MySQL and Hadoop was a non-negotiable architectural choice. It decouples the source from the destination, provides data durability, and allows multiple downstream consumers to subscribe to the change stream independently. It also naturally handles backpressure; if the Hadoop ingestion process slows down, Kafka retains the data until it can be processed.

To manage this multi-component stack (MySQL, Zookeeper, Kafka, Kafka Connect), containerization was the only sane path forward. A docker-compose setup would allow us to define the entire pipeline declaratively, ensuring environment consistency from local development to staging and production. It simplifies dependency management and makes the entire system portable and reproducible.

Phase 1: The Dockerized Foundation

The first step was to build a stable, reproducible local environment that mirrored our production components. A real-world project requires precise configuration, especially for networking and data persistence within Docker.

Here is the docker-compose.yml that defines our entire pipeline. Note the specific configurations: MySQL is set up with binlog enabled in ROW format, which is essential for Debezium. Kafka Connect depends on both Kafka and the database being ready. Volumes are used to persist data across container restarts, which is critical for Kafka logs and MySQL data.

# docker-compose.yml
# A production-grade foundation for our CDC pipeline.

version: '3.8'

services:
  mysql:
    image: mysql:8.0
    container_name: cdc_mysql_source
    ports:
      - "3306:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=mysecretpassword
      - MYSQL_DATABASE=inventory
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
    volumes:
      - mysql_data:/var/lib/mysql
      - ./mysql-init:/docker-entrypoint-initdb.d
    command:
      - '--server-id=101'
      - '--binlog_format=ROW'
      - '--log-bin=mysql-bin'
      - '--gtid-mode=ON'
      - '--enforce-gtid-consistency=ON'
      - '--log-slave-updates=ON'
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-pmysecretpassword"]
      interval: 10s
      timeout: 5s
      retries: 5

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: cdc_zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    container_name: cdc_kafka
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: confluentinc/cp-kafka-connect:7.3.2
    container_name: cdc_connect
    ports:
      - "8083:8083"
    depends_on:
      mysql:
        condition: service_healthy
      kafka:
        condition: service_started
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
    # We mount the Debezium connector JARs into the plugin path.
    # In a real setup, you'd build a custom Docker image.
    volumes:
      - ./debezium-connector-mysql:/usr/share/confluent-hub-components/debezium-connector-mysql

volumes:
  mysql_data:

To bootstrap the database, a simple schema is loaded on startup.

-- mysql-init/01-schema.sql
CREATE DATABASE inventory;
USE inventory;

CREATE TABLE products (
  id INT AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description TEXT,
  stock INT NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

CREATE TABLE sales (
  order_id VARCHAR(36) PRIMARY KEY,
  product_id INT,
  quantity INT,
  sale_price DECIMAL(10, 2),
  order_date DATETIME,
  FOREIGN KEY (product_id) REFERENCES products(id)
);

Phase 2: Configuring the Debezium Connector

With the infrastructure running via docker-compose up -d, the next step is to configure and deploy the Debezium connector. This is done by sending a JSON payload to the Kafka Connect REST API. A common mistake is to misconfigure the connector, leading to silent failures or incomplete data capture. Every parameter here is critical.

The database.history.kafka.topic is particularly important; Debezium uses this internal topic to track the history of DDL changes, making it resilient to schema evolution. The server.id must be unique across your entire MySQL replication topology.

// debezium-mysql-source-config.json
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "mysecretpassword",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "inventory",
    "table.include.list": "inventory.products,inventory.sales",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "include.schema.changes": "true",
    "snapshot.mode": "initial"
  }
}

To deploy this, we use a simple curl command targeting the Connect container.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
  http://localhost:8083/connectors/ \
  --data @debezium-mysql-source-config.json

Phase 3: Observing the Change Stream

After deploying the connector, Debezium performs an initial consistent snapshot of the included tables and then switches to streaming changes from the binlog. To verify this, we can use a command-line tool like kafkacat (or kcat) to inspect the Kafka topics. For the products table, the topic will be named dbserver1.inventory.products by default.

Let’s insert some data into MySQL:

INSERT INTO products (name, description, stock) VALUES
('Laptop', '15-inch, 16GB RAM, 512GB SSD', 50),
('Mouse', 'Wireless ergonomic mouse', 200);

Now, tail the topic from within another container or from the host if kafkacat is installed:

docker exec -it cdc_kafka /usr/bin/kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic dbserver1.inventory.products \
  --from-beginning

The output for the first insert will be a rich JSON document. The critical parts are the before and after fields, which represent the state of the row before and after the change. For an INSERT, before is null.

{
  "before": null,
  "after": {
    "id": 1,
    "name": "Laptop",
    "description": "15-inch, 16GB RAM, 512GB SSD",
    "stock": 50,
    "created_at": 1667502534000,
    "updated_at": 1667502534000
  },
  "source": { ... },
  "op": "c", // 'c' for create/insert
  "ts_ms": 1667502534512,
  "transaction": null
}

This confirms our pipeline’s source side is working correctly. Changes in MySQL are being captured and published to Kafka in near real-time.

Phase 4: The Ingestion Pitfall - The Small Files Problem

The next logical step was to get this data into HDFS. The Kafka Connect framework has a plethora of sink connectors, including several for HDFS. My first attempt involved using a standard, off-the-shelf HDFS sink connector. The configuration was straightforward.

However, after running this for a few hours with moderate traffic, a classic big data problem emerged: the HDFS “small files problem.” The connector was flushing data to HDFS frequently to achieve low latency, resulting in thousands of tiny JSON files. HDFS is optimized for large files; the NameNode’s memory is consumed by file metadata, and MapReduce or Spark jobs run inefficiently when processing a massive number of small files. Furthermore, querying raw JSON with Hive or Presto is slow. We needed a columnar format like Parquet.

This is a critical lesson in production data engineering: a solution that works functionally can be an operational disaster at scale. The generic HDFS sink was not the right tool for this job.

graph TD
    subgraph "Attempt 1: The Pitfall"
        A[MySQL Binlog] --> B{Debezium};
        B --> C[Kafka Topic];
        C --> D{Generic HDFS Sink};
        D --> E[HDFS: Many small JSON files];
        style E fill:#f96,stroke:#333,stroke-width:2px;
    end

Phase 5: Production-Grade Ingestion with a Custom Consumer

We needed more control over the ingestion process. The solution was to write a dedicated consumer service that pulls messages from Kafka, buffers them in memory, converts them to the Parquet format, and writes them to HDFS in larger, properly sized files (e.g., 128MB). This approach allows us to control file rotation policies (based on size, time, or number of records) and handle data partitioning correctly (e.g., by date).

Here is a conceptual Python implementation using kafka-python and pyarrow. In a production system, this would be a robust, long-running service with proper logging, metrics, and error handling (e.g., a dead-letter queue for unparseable messages).

# hdfs_ingestion_service.py
# A robust consumer for writing batched Parquet files to HDFS.

import json
import logging
from datetime import datetime
from kafka import KafkaConsumer, TopicPartition
from pyarrow import Table, schema, string, int32, timestamp
import pyarrow.parquet as pq
import pyarrow.fs as fs

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Constants
KAFKA_BROKER = 'localhost:29092'
KAFKA_TOPIC = 'dbserver1.inventory.products'
HDFS_HOST = 'localhost' # Assuming a local HDFS for this example
HDFS_PORT = 9000
BATCH_SIZE = 1000  # Number of messages to buffer before writing
BATCH_TIMEOUT_MS = 60000 # Max time to wait for a full batch

# Define the Parquet schema. This is crucial for performance and compatibility.
# In a real system, this would be derived from a schema registry.
PYARROW_SCHEMA = schema([
    ('id', int32()),
    ('name', string()),
    ('description', string()),
    ('stock', int32()),
    ('updated_at', timestamp('ms'))
])

class HdfsIngestionService:
    def __init__(self):
        try:
            self.consumer = KafkaConsumer(
                bootstrap_servers=[KAFKA_BROKER],
                auto_offset_reset='earliest',
                enable_auto_commit=False,  # Manual offset control is critical
                group_id='hdfs-ingestion-group',
                value_deserializer=lambda x: json.loads(x.decode('utf-8'))
            )
            self.hdfs = fs.HadoopFileSystem(HDFS_HOST, HDFS_PORT)
            logging.info("Successfully connected to Kafka and HDFS.")
        except Exception as e:
            logging.error(f"Failed to initialize services: {e}")
            raise

    def process_messages(self):
        self.consumer.subscribe([KAFKA_TOPIC])
        logging.info(f"Subscribed to topic: {KAFKA_TOPIC}")

        records_batch = []
        last_commit_time = datetime.now()

        try:
            while True:
                # Poll for messages with a timeout
                messages = self.consumer.poll(timeout_ms=BATCH_TIMEOUT_MS, max_records=BATCH_SIZE)
                
                if not messages:
                    # If timeout is reached and we have data, write it
                    if records_batch:
                        logging.info("Poll timeout reached. Writing existing batch.")
                        self.write_batch_to_hdfs(records_batch)
                        self.consumer.commit()
                        records_batch.clear()
                    continue

                for tp, msgs in messages.items():
                    for msg in msgs:
                        # Extract the 'after' state from the Debezium message
                        payload = msg.value.get('payload', {})
                        if payload.get('op') in ['c', 'u', 'r']: # create, update, read (snapshot)
                            after_state = payload.get('after')
                            if after_state:
                                # Data transformation and cleaning can happen here
                                record = {
                                    'id': after_state.get('id'),
                                    'name': after_state.get('name'),
                                    'description': after_state.get('description'),
                                    'stock': after_state.get('stock'),
                                    # Debezium timestamp is in microseconds for some versions, convert to ms
                                    'updated_at': after_state.get('updated_at')
                                }
                                records_batch.append(record)

                if len(records_batch) >= BATCH_SIZE:
                    logging.info(f"Batch size reached ({len(records_batch)} records). Writing to HDFS.")
                    self.write_batch_to_hdfs(records_batch)
                    self.consumer.commit() # Commit offsets only after a successful write
                    records_batch.clear()

        except KeyboardInterrupt:
            logging.info("Shutdown signal received.")
        finally:
            if records_batch:
                logging.info("Writing final batch before shutdown.")
                self.write_batch_to_hdfs(records_batch)
                self.consumer.commit()
            self.consumer.close()
            logging.info("Kafka consumer closed.")


    def write_batch_to_hdfs(self, batch):
        if not batch:
            return

        try:
            # Create a PyArrow Table from the batch of records
            table = Table.from_pylist(batch, schema=PYARROW_SCHEMA)

            # Generate a unique filename and path with date-based partitioning
            now = datetime.now()
            partition_path = f"/data/inventory/products/year={now.year}/month={now.month:02d}/day={now.day:02d}"
            file_path = f"{partition_path}/data_{now.strftime('%Y%m%d%H%M%S%f')}.parquet"

            logging.info(f"Writing {len(batch)} records to {file_path}")
            
            # Write the Parquet file to HDFS
            pq.write_to_dataset(
                table,
                root_path="/data/inventory/products",
                partition_cols=['year', 'month', 'day'],
                filesystem=self.hdfs,
                existing_data_behavior='overwrite_or_ignore'
            )
            
            # A more direct approach without dataset API for single file write:
            # with self.hdfs.open_output_stream(file_path) as f:
            #     pq.write_table(table, f)

            logging.info("Write to HDFS successful.")
        except Exception as e:
            # In production, implement a retry mechanism or DLQ
            logging.error(f"Failed to write batch to HDFS: {e}")
            # Do NOT commit offsets here, so we can retry processing this batch.

if __name__ == '__main__':
    service = HdfsIngestionService()
    service.process_messages()

This custom consumer gives us full control, solving the small files problem and ensuring the data is stored in an analytics-friendly format. The manual offset management (enable_auto_commit=False) is crucial for guaranteeing at-least-once processing semantics. We only commit the Kafka offsets after a batch has been successfully written to HDFS.

graph TD
    subgraph "Final Architecture"
        A[MySQL Binlog] --> B{Debezium in Kafka Connect};
        B --> C[Kafka Topic];
        C --> F{Custom Ingestion Service};
        F --> G[HDFS: Large, Partitioned Parquet files];
        subgraph Hive/Presto/Spark
            H[Analytical Queries]
        end
        G --> H;
        style G fill:#9f9,stroke:#333,stroke-width:2px;
    end

Phase 6: Handling Production Realities - Schema Evolution

The system ran smoothly for weeks until a developer added a weight_kg DECIMAL(10, 3) column to the products table. The pipeline didn’t crash immediately, but downstream analytics jobs started failing. The Python consumer, with its hardcoded schema, couldn’t process the new messages containing the weight_kg field, causing it to log errors and fall behind.

This is where a Schema Registry becomes essential. By integrating a tool like Confluent Schema Registry and switching our Kafka converters to Avro (io.confluent.connect.avro.AvroConverter), Debezium will automatically register and version schemas. Our consumer can then fetch the correct schema for each message, allowing it to adapt dynamically to DDL changes like adding new columns. This adds complexity to the setup but is a requirement for any long-lived, production CDC pipeline. The ingestion code would need to be modified to query the Schema Registry to decode messages and dynamically adjust the Parquet schema.

The final architecture is robust, scalable, and addresses the initial business problem. It provides a near real-time stream of data to our Hadoop ecosystem without impacting the source database, and it’s built to handle the realities of a production environment like schema changes and backpressure.

This implementation, while functional, still has limitations. The custom consumer, as a single Python process, is a single point of failure and a potential bottleneck. In a true production environment, this service would need to be containerized and deployed on an orchestrator like Kubernetes with multiple replicas running as part of the same consumer group to parallelize processing. Furthermore, the initial snapshot process for a very large table can still place a significant read load on the source MySQL database; this needs to be scheduled during off-peak hours and monitored closely. Finally, handling destructive schema changes, such as deleting or renaming a column, requires a more sophisticated evolution strategy and potential manual intervention to avoid breaking downstream consumers that depend on the old schema.


  TOC