Building a Transactional Data Lakehouse Ingest Pipeline with TiDB CDC NATS and Apache Hudi


The 24-hour latency on our core analytics dataset was no longer acceptable. Our batch ETL jobs, pulling snapshots from a TiDB cluster, were reliable but slow, leaving business intelligence perpetually a day behind. The mandate was clear: shrink the data delay from hours to minutes, and do it without compromising the transactional integrity of the source data as it lands in our Apache Hudi-based data lake. This meant capturing every INSERT, UPDATE, and DELETE in near real-time.

Our initial architecture discussions were a whirlwind of familiar tools. The default path for many would be TiDB’s Change Data Capture (CDC) into Kafka, processed by a heavy streaming framework like Flink or Spark Streaming. In a real-world project, however, you don’t just pick the most powerful tool; you pick the one with the right trade-offs in operational complexity, resource footprint, and failure modes for your specific problem. Our problem was point-to-point replication. We didn’t need complex in-flight transformations or stateful aggregations. We needed a durable, ordered transport and a reliable writer. This realization led us down a less-traveled, but ultimately more pragmatic, path.

flowchart TD
    subgraph Source Transactional System
        A[TiDB Cluster]
    end

    subgraph Real-time Capture & Transport
        B[TiCDC] --> C{NATS JetStream}
    end

    subgraph Processing & Staging Layer
        D[Go CDC Consumer] -- Pulls Messages --> C
        D -- Batches to Object Storage --> E[S3/MinIO Micro-Batches]
    end

    subgraph Data Lakehouse Ingest
        F[Scheduled Spark Job] -- Reads Batches --> E
        F -- Upserts Data --> G[Apache Hudi Table]
    end

    subgraph CI/CD & Monitoring
        H[GitHub Repo] -- GitHub Actions --> I[Deploy Consumer & Spark Job]
        J[Astro Dashboard] -- Reads Metrics API --> D
    end

    A -- Transactional Data --> B

This architecture represents the final state, but reaching it involved solving a series of non-trivial problems, particularly around idempotency, schema evolution, and the impedance mismatch between a Go-based streaming consumer and a JVM-based Hudi writer.

Step 1: Plumbing the Source with TiCDC and NATS JetStream

The foundation is TiDB’s native CDC component, TiCDC. It taps into the underlying Raft logs of TiKV to provide low-latency, ordered streams of row-level changes. The first step was creating a changefeed. We specifically chose the canal-json format for its simplicity and human-readability during debugging, though Avro is a better choice for production due to schema enforcement and smaller payload sizes.

A common mistake is to point CDC directly at a message queue without considering the queue’s own guarantees. We opted for NATS JetStream over Kafka. The rationale was operational simplicity. For a single-purpose pipeline, setting up and tuning a Zookeeper/Kafka cluster felt like bringing a sledgehammer to crack a nut. JetStream offers sufficient durability, ordering, and at-least-once delivery semantics with a much lighter operational footprint.

First, we define the JetStream stream. This isn’t just a subject; it’s a durable, disk-backed log.

# nats-cli stream add command
nats stream add TIDB_CDC_ORDERS \
  --subjects "cdc.orders.>" \
  --storage file \
  --retention limits \
  --max-msgs-per-subject 1000000 \
  --max-age 168h \
  --ack \
  --replicas 3 # For production HA

This configuration creates a stream named TIDB_CDC_ORDERS that captures all subjects starting with cdc.orders.. It has file-based persistence, a 7-day retention policy, and requires explicit acknowledgements, which is critical for at-least-once delivery.

Next, we create the TiCDC changefeed pointing to this NATS sink.

# ticdc cli changefeed create command
tiup ctl:v7.5.0 cdc changefeed create \
  --pd="http://<pd-address>:2379" \
  --sink-uri="nats://<nats-server>:4222" \
  --changefeed-id="tidb-orders-to-hudi" \
  --config - <<EOF
[sink]
protocol = "canal-json"

[sink.dispatchers]
dispatcher = "ts"
rules = [
  {match = ["test.orders"], topic = "cdc.orders.test_orders"},
]
EOF

The key piece here is dispatcher = "ts". This ensures that all changes for a given row (based on its primary key) are routed to the same partition, preserving order for that row. The topic template defines the NATS subject where messages will be published.

Step 2: The Go Consumer - Bridging Stream and Batch

With data flowing into NATS, we needed a consumer. A heavyweight framework felt like overkill. A lightweight, containerized Go service provides excellent performance, low memory usage, and direct control over the consumption logic.

The core responsibility of this consumer is to:

  1. Connect to NATS JetStream with a durable subscription.
  2. Receive messages in batches.
  3. Parse the canal-json format.
  4. Handle DDL events vs. DML events.
  5. Stage these messages as micro-batches in an intermediate object store (like MinIO or S3) for the Hudi writer.
  6. Acknowledge messages only after they are successfully staged.

Here’s a condensed version of the core consumer logic.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
	// ... other imports
)

const (
	natsURL        = "nats://localhost:4222"
	streamName     = "TIDB_CDC_ORDERS"
	subject        = "cdc.orders.>"
	durableName    = "hudi-ingest-consumer"
	batchSize      = 1000
	batchTimeout   = 5 * time.Second
	s3Bucket       = "hudi-ingest-staging"
	s3Endpoint     = "http://localhost:9000"
	s3Region       = "us-east-1"
)

// Represents a simplified TiCDC Canal-JSON message
type CDCMessage struct {
	ID        int64             `json:"id"`
	Database  string            `json:"database"`
	Table     string            `json:"table"`
	PKNames   []string          `json:"pkNames"`
	IsDDL     bool              `json:"isDdl"`
	Type      string            `json:"type"` // INSERT, UPDATE, DELETE
	TS        int64             `json:"ts"`   // Commit timestamp
	Data      []json.RawMessage `json:"data"`
	Old       []json.RawMessage `json:"old"`
}

// Staging record format for Hudi
type HudiRecord struct {
    CDCMessage
    Op string `json:"op"` // 'I', 'U', 'D' for Hudi
}

func main() {
	nc, err := nats.Connect(natsURL)
	if err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}
	defer nc.Close()

	js, err := nc.JetStream()
	if err != nil {
		log.Fatalf("Failed to get JetStream context: %v", err)
	}

	// Create a durable, pull-based subscription
	sub, err := js.PullSubscribe(subject, durableName, nats.BindStream(streamName))
	if err != nil {
		log.Fatalf("Failed to create pull subscription: %v", err)
	}

    sess := createS3Session()
    uploader := s3manager.NewUploader(sess)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Main processing loop
	for {
		processBatch(ctx, sub, uploader)
	}
}

func processBatch(ctx context.Context, sub *nats.Subscription, uploader *s3manager.Uploader) {
	var messages []*nats.Msg
	var batchBuffer strings.Builder
	
	// Fetch messages until batch size or timeout is reached
	fetchCtx, cancel := context.WithTimeout(ctx, batchTimeout)
	defer cancel()

	for len(messages) < batchSize {
		msgs, err := sub.Fetch(batchSize-len(messages), nats.Context(fetchCtx))
		if err != nil && err != context.DeadlineExceeded {
			log.Printf("Error fetching messages: %v", err)
			time.Sleep(2 * time.Second) // Backoff
			return
		}
		messages = append(messages, msgs...)
		if err == context.DeadlineExceeded {
			break // Timeout reached, process what we have
		}
	}
    
    if len(messages) == 0 {
        return
    }

	log.Printf("Processing batch of %d messages", len(messages))

	for _, msg := range messages {
		var cdcMsg CDCMessage
		if err := json.Unmarshal(msg.Data, &cdcMsg); err != nil {
			log.Printf("Failed to unmarshal message, sending to DLQ: %v", err)
			msg.Nak() // Or move to a dead-letter queue
			continue
		}

		if cdcMsg.IsDDL {
			log.Printf("DDL event received, pausing ingestion: %s", string(msg.Data))
			// In a real system, this would trigger an alert or an automated schema migration process.
			// For now, we just acknowledge and ignore.
			msg.Ack()
			continue
		}

        hudiRec := transformToHudiRecord(cdcMsg)
        recBytes, err := json.Marshal(hudiRec)
        if err != nil {
            log.Printf("Failed to marshal Hudi record: %v", err)
            msg.Nak()
            continue
        }
		batchBuffer.Write(recBytes)
        batchBuffer.WriteString("\n")
	}

    // A pitfall here is handling an empty buffer if all messages were DDL/bad
    if batchBuffer.Len() > 0 {
        err := uploadToS3(ctx, uploader, batchBuffer.String())
        if err != nil {
            log.Printf("Failed to upload to S3, will not ACK messages: %v", err)
            // Do not ACK, messages will be redelivered after AckWait period
            return 
        }
    }
    
    // Acknowledge all messages in the batch after successful S3 upload
    for _, msg := range messages {
        if err := msg.Ack(); err != nil {
            log.Printf("Failed to ACK message %s: %v", msg.Headers.Get("Nats-Sequence"), err)
            // This is a critical failure state. If S3 upload succeeded but ACK fails,
            // we risk data duplication. Needs robust monitoring.
        }
    }
    log.Printf("Successfully processed and uploaded batch of %d messages.", len(messages))
}

func transformToHudiRecord(msg CDCMessage) HudiRecord {
    rec := HudiRecord{CDCMessage: msg}
    switch msg.Type {
    case "INSERT":
        rec.Op = "I"
    case "UPDATE":
        rec.Op = "U"
    case "DELETE":
        rec.Op = "D"
        // For deletes, Hudi needs the original record data.
        // Canal-JSON provides this in the 'data' field for DELETEs.
    }
    return rec
}

// S3 session and upload functions omitted for brevity

The critical design choice here is the decoupling. The Go consumer’s only job is to get data from NATS to S3 reliably. It doesn’t know about Hudi, Parquet, or Spark. This makes the consumer simple, robust, and easy to maintain. The real “Hudi magic” happens in the next step. A major pitfall we avoided was trying to use a CGo-based Java library from Go to write Hudi directly. That path leads to deployment nightmares and maintenance headaches.

Step 3: The PySpark Hudi Writer - Transactional Ingest

A Spark job, scheduled to run every minute, is responsible for writing the staged data into the Hudi table. This job reads all new JSON files from the S3 staging directory, converts them into a DataFrame, and uses Hudi’s DataSource API to perform a transactional upsert.

This PySpark script is the heart of the Hudi integration. It must be configured correctly to ensure idempotency and handle deletes properly.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, ArrayType, MapType

# Configuration
S3_STAGING_PATH = "s3a://hudi-ingest-staging/YYYY/MM/DD/HH/mm/*" # Path will be dynamic
HUDI_TABLE_PATH = "s3a://data-lake/orders_hudi"
TABLE_NAME = "orders"
DB_NAME = "lakehouse"

# Hudi options
HUDI_OPTIONS = {
    'hoodie.table.name': TABLE_NAME,
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', # Or MERGE_ON_READ
    'hoodie.datasource.write.recordkey.field': 'id', # Primary key from the source table
    'hoodie.datasource.write.partitionpath.field': 'partition_date', # Example partitioning
    'hoodie.datasource.write.precombine.field': 'ts', # Crucial for idempotency
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.database': DB_NAME,
    'hoodie.datasource.hive_sync.table': TABLE_NAME,
    'hoodie.datasource.hive_sync.partition_fields': 'partition_date',
    'hoodie.datasource.hive_sync.jdbcurl': 'jdbc:hive2://<hive-metastore>:10000',
    'hoodie.write.schema.evolution.enable': 'true'
}

def main():
    spark = SparkSession.builder \
        .appName("Hudi CDC Ingest") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .getOrCreate()
    
    # Define schema to parse the JSON from our Go consumer
    # This must match the HudiRecord struct in Go
    json_schema = StructType([
        StructField("id", LongType(), True),
        StructField("database", StringType(), True),
        StructField("table", StringType(), True),
        StructField("ts", LongType(), True), # The commit timestamp
        StructField("type", StringType(), True),
        # Assuming 'data' contains the full row as a map
        StructField("data", ArrayType(MapType(StringType(), StringType())), True),
        StructField("op", StringType(), True), # 'I', 'U', 'D'
    ])

    # In a real job, you would calculate the path for the last minute
    # This is a simplified example
    df = spark.read.text(S3_STAGING_PATH)
    
    # The pitfall: if there are no new files, this read will fail.
    # Production code needs to handle this gracefully.
    if df.rdd.isEmpty():
        print("No new data to process.")
        return

    parsed_df = df.select(from_json(col("value"), json_schema).alias("parsed")).select("parsed.*")

    # The 'data' field is an array, we need to flatten it
    # TiCDC sends the 'after' state for INSERT/UPDATE in data[0]
    # and the 'before' state for DELETE in data[0].
    final_df = parsed_df.select(
        col("id"),
        col("ts"),
        col("op"),
        # Extract fields from the map
        col("data")[0].getItem("order_date").alias("partition_date"),
        col("data")[0].getItem("customer_id").alias("customer_id"),
        col("data")[0].getItem("order_total").alias("order_total"),
        col("data")[0].getItem("status").alias("status"),
    )

    # For deletes, Hudi needs a way to identify them. We add a special column.
    # Another approach is using a custom payload class.
    # For DefaultHoodieRecordPayload, setting all other fields to null for a delete works.
    # Here, we will rely on the precombine field to handle deletes correctly when the 'op' is 'D'.
    # A cleaner way is to use a specific payload that recognizes a delete flag.
    # We also add a column to signify deletion for Hudi's payload.
    final_df = final_df.withColumn("_hoodie_is_deleted", col("op") == lit('D'))
    
    # Write to Hudi
    final_df.write.format("hudi") \
        .options(**HUDI_OPTIONS) \
        .mode("append") \
        .save(HUDI_TABLE_PATH)
    
    # After a successful write, clean up the staged files from S3.
    # This logic is critical to prevent reprocessing.
    # (Cleanup logic omitted for brevity)

if __name__ == "__main__":
    main()

The most important configuration here is hoodie.datasource.write.precombine.field': 'ts'. The ts field is the commit timestamp from the TiDB transaction log. If NATS redelivers a message that has already been processed, Hudi will see two records with the same record key (id). It will then use the precombine field to pick the one with the higher ts value. Since both records will have the same ts, it effectively de-duplicates the event, making the write idempotent. This is the lynchpin of the entire system’s correctness. For DELETE operations, the precombine logic ensures that the deletion event, which should have the latest timestamp, wins over any previous state of the row.

Step 4: Automation and Observability with GitHub Actions and Astro

A data pipeline isn’t complete until its deployment is automated and its health is observable.

We used GitHub Actions for CI/CD. The workflow has two main jobs:

  1. Build and Push Consumer: On a push to the main branch, it builds the Go binary, containerizes it using a Dockerfile, and pushes the image to our container registry.
  2. Deploy: It then uses kubectl or a similar tool to roll out the new version of the consumer deployment in our Kubernetes cluster. It also synchronizes the PySpark script to an S3 bucket from where the Spark job runner can pick it up.
# .github/workflows/deploy.yml
name: Deploy CDC Pipeline

on:
  push:
    branches:
      - main

jobs:
  build-and-deploy-consumer:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Go
        uses: actions/setup-go@v3
        with:
          go-version: '1.20'

      - name: Build
        run: go build -v -o cdc-consumer .

      - name: Docker Login
        uses: docker/login-action@v2
        with:
          username: ${{ secrets.DOCKERHUB_USERNAME }}
          password: ${{ secrets.DOCKERHUB_TOKEN }}
      
      - name: Build and push Docker image
        uses: docker/build-push-action@v4
        with:
          context: .
          push: true
          tags: my-org/cdc-consumer:latest

      # - name: Deploy to K8s
      #   run: |
      #     # kubectl apply -f k8s/deployment.yaml ...
      #     echo "Deploying to cluster..."

  deploy-spark-job:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: us-east-1

      - name: Sync PySpark script to S3
        run: aws s3 sync ./spark_jobs s3://my-spark-scripts/hudi-ingest/

For monitoring, a complex Grafana dashboard felt like overkill. We needed a simple status page. The Go consumer exposes a lightweight metrics endpoint (/metrics) providing key stats like last_processed_ts, messages_in_current_batch, and last_s3_upload_time. We built a simple, static dashboard using Astro. Astro is perfect for this because it can fetch data at build time or on the client side, generating an incredibly fast and lightweight UI with minimal effort. This dashboard gives the on-call engineer a quick, at-a-glance view of the pipeline’s health without logging into a complex monitoring system.

Lingering Issues and Next Steps

This architecture successfully reduced our data latency from 24 hours to under 2 minutes, with transactional integrity preserved from TiDB to Hudi. However, it’s not without its own set of trade-offs and areas for improvement.

The primary limitation is the latency floor introduced by the micro-batching. The Spark job runs every minute, so data will always be at least that old. For true sub-second streaming, we would need to explore Hudi’s Streamer or DeltaStreamer, which can run continuously. This would, however, couple the consumer more tightly to the Hudi/Spark ecosystem and increase the complexity of the Go consumer significantly.

Schema evolution handling is currently semi-manual. The consumer detects DDL and logs an alert, but an operator must intervene to apply the schema change to the target Hudi table and restart the pipeline. Automating this process, perhaps by having the consumer stage the DDL statement for the Spark job to interpret and apply, is the next critical iteration.

Finally, the cost of running a Spark job every minute, even a small one, is not zero. For tables with very high write volumes, the Copy-on-Write (CoW) strategy could lead to significant write amplification. A future optimization would be to switch these high-volume tables to Merge-on-Read (MoR) to optimize for ingest performance, at the cost of slightly higher query latency.


  TOC