Engineering an Idempotent Data Pipeline for Synchronizing Apache Hudi Commits to a Meilisearch Index via AWS SNS


The initial request was straightforward: provide sub-second search capabilities over our rapidly growing multi-terabyte product catalog stored in an Apache Hudi table on S3. The data lake is our source of truth, but its query latency, even with optimizations, is measured in minutes, not milliseconds. A full re-indexing job from Hudi to a search engine like Meilisearch was the first, obvious solution. It was also obviously flawed. Running it daily meant up to 24 hours of data staleness. Running it hourly was a cost and compute nightmare that put unnecessary load on both the data platform and the search service.

The clear path forward was an incremental pipeline. Hudi is purpose-built for this, providing a stream of changes from a given commit timestamp. The initial architecture proposed was a simple Spark job running every five minutes to pull these changes and push them directly to the Meilisearch API. It worked on a whiteboard. It would have been a disaster in production.

A critical code review session dismantled this design. The primary objection was the tight coupling and lack of resilience. If the Meilisearch instance was down for maintenance, or if a network blip occurred, the entire Spark batch would fail. The job would then have to retry the entire five-minute window, creating complex state management problems. More insidiously, what if the job failed halfway through pushing a batch of 10,000 records? Some records would be in the index, some not. The next run would re-process the entire window, creating duplicates or performing wasteful updates.

This review forced a complete rethink, pivoting to a decoupled, event-driven architecture with idempotency as a first-class citizen. The final system uses AWS SNS as a durable message bus and embeds idempotency keys within the data flow to guarantee exactly-once processing from the perspective of the end user’s search query. This is a post-mortem of that build.

flowchart TD
    subgraph "AWS Data Plane"
        A[Apache Hudi Table on S3] --> B{Spark Job on EMR};
        B -- "1. Read last processed commit" --> C[Checkpoint Store: DynamoDB];
        B -- "2. Incremental Query" --> A;
        B -- "3. Publish change events" --> D[AWS SNS Topic];
        B -- "4. Update checkpoint on success" --> C;
    end

    subgraph "AWS Messaging & Compute"
        D --> E[AWS SQS Queue];
        E --> F[Consumer Service on ECS];
    end
    
    subgraph "Search & State Plane"
        F -- "5. Read message batch" --> E;
        F -- "6. Check/Set idempotency key" --> G[Idempotency Store: Redis];
        F -- "7. add_or_replace_documents" --> H[Meilisearch Index];
        F -- "8. Delete message on success" --> E;
    end

The Hudi Incremental Producer in Spark

The producer’s sole responsibility is to read the latest changes from the Hudi table reliably and publish them to SNS. To achieve this, it needs a persistent checkpoint. Storing the last successfully processed commit timestamp in a file on S3 is an option, but for low-latency access and atomic updates, AWS DynamoDB is a better fit.

Our Hudi table is a COPY_ON_WRITE table, which simplifies incremental pulls as we don’t have to deal with merging delta and base files. The core of the producer is a Spark job written in Scala.

First, the setup of the DynamoDB table for checkpointing is minimal. It needs a primary key to identify the pipeline and an attribute to store the timestamp.

# AWS CLI command to create the checkpoint table
aws dynamodb create-table \
    --table-name HudiPipelineCheckpoints \
    --attribute-definitions \
        AttributeName=PipelineID,AttributeType=S \
    --key-schema \
        AttributeName=PipelineID,KeyType=HASH \
    --provisioned-throughput \
        ReadCapacityUnits=1,WriteCapacityUnits=1

The Spark application itself handles the logic of reading and writing this checkpoint.

// build.sbt dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.3.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.3.1" % "provided",
  "org.apache.hudi" %% "hudi-spark3.3-bundle" % "0.12.2",
  "software.amazon.awssdk" % "dynamodb" % "2.17.290",
  "software.amazon.awssdk" % "sns" % "2.17.290",
  "com.google.code.gson" % "gson" % "2.9.1"
)

The core producer logic involves a clear sequence: fetch checkpoint, query Hudi, publish messages, update checkpoint. This sequence ensures at-least-once delivery to SNS. The downstream consumer will handle the “at-least-once” and refine it to “exactly-once”.

package com.example.hudi2search

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, GetItemRequest, PutItemRequest}
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry
import software.amazon.awssdk.services.sns.model.PublishBatchRequest

import com.google.gson.Gson
import org.slf4j.LoggerFactory

import java.time.Instant
import java.util.{Map => JMap, UUID}
import scala.collection.JavaConverters._
import scala.util.hashing.MurmurHash3

object HudiIncrementalProducer {

  private val logger = LoggerFactory.getLogger(this.getClass)
  private val pipelineId = "hudi-product-catalog-to-meilisearch"
  private val hudiTablePath = "s3://my-data-lake/product_catalog"
  private val checkpointTableName = "HudiPipelineCheckpoints"
  private val snsTopicArn = "arn:aws:sns:us-east-1:123456789012:hudi-changes"

  // Utility to interact with the checkpoint store
  object CheckpointManager {
    private val dynamoDbClient = DynamoDbClient.builder().region(Region.US_EAST_1).build()

    def getLastCommitTimestamp: String = {
      val request = GetItemRequest.builder()
        .tableName(checkpointTableName)
        .key(Map("PipelineID" -> AttributeValue.builder().s(pipelineId).build()).asJava)
        .build()
      
      val response = dynamoDbClient.getItem(request)
      if (response.hasItem) {
        response.item().get("LastCommitTimestamp").s()
      } else {
        "0" // Bootstrap from the beginning of time
      }
    }

    def updateCommitTimestamp(timestamp: String): Unit = {
      val item = Map(
        "PipelineID" -> AttributeValue.builder().s(pipelineId).build(),
        "LastCommitTimestamp" -> AttributeValue.builder().s(timestamp).build(),
        "LastUpdatedAt" -> AttributeValue.builder().s(Instant.now().toString).build()
      ).asJava

      val request = PutItemRequest.builder()
        .tableName(checkpointTableName)
        .item(item)
        .build()
      
      dynamoDbClient.putItem(request)
      logger.info(s"Successfully checkpointed commit timestamp: $timestamp")
    }
  }

  // SNS publisher with batching for efficiency and cost-saving
  object SnsPublisher {
    private val snsClient = SnsClient.builder().region(Region.US_EAST_1).build()
    private val gson = new Gson()

    def publishChanges(records: DataFrame): Unit = {
      if (records.isEmpty) {
        logger.info("No new records to publish.")
        return
      }
      
      val messages = records.toJSON.collect().map { jsonRecord =>
        // The record itself must contain a unique key, e.g., 'product_id'
        // We'll create a deterministic idempotency key for the consumer.
        val recordMap = gson.fromJson(jsonRecord, classOf[JMap[String, Object]])
        val productId = recordMap.get("product_id").toString
        val commitTime = recordMap.get("_hoodie_commit_time").toString
        val operation = recordMap.get("_hoodie_operation").toString // 'i' for insert, 'u' for update
        
        // This key MUST be deterministic. The consumer will use it to deduplicate.
        val idempotencyKey = s"$productId:$commitTime"
        
        val messagePayload = Map(
          "idempotencyKey" -> idempotencyKey,
          "operation" -> operation,
          "payload" -> recordMap
        )

        PublishBatchRequestEntry.builder()
          .id(UUID.randomUUID().toString.replace("-", "")) // Batch entry ID, must be unique within the request
          .message(gson.toJson(messagePayload))
          .build()
      }
      
      // SNS PublishBatch has a limit of 10 messages per request
      messages.sliding(10, 10).foreach { batch =>
        try {
          val request = PublishBatchRequest.builder()
            .topicArn(snsTopicArn)
            .publishBatchRequestEntries(batch.asJava)
            .build()
          
          val result = snsClient.publishBatch(request)

          if (result.hasFailed && !result.failed().isEmpty) {
            // In a production system, this requires a robust retry strategy or dead-letter queue.
            // For now, we fail the entire job to force a re-run of the batch.
            logger.error(s"Failed to publish ${result.failed().size()} messages to SNS.")
            result.failed().forEach(fail => logger.error(s"ID: ${fail.id()}, Code: ${fail.code()}, Message: ${fail.message()}"))
            throw new RuntimeException("SNS publishing failed for one or more messages.")
          }
          logger.info(s"Successfully published a batch of ${batch.size} messages to SNS.")
        } catch {
          case e: Exception =>
            logger.error("Exception during SNS publish batch.", e)
            throw e // Re-throw to fail the Spark job
        }
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Hudi Incremental Producer")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
      
    val beginCommitTimestamp = CheckpointManager.getLastCommitTimestamp
    logger.info(s"Starting incremental pull from Hudi table. Begin commit timestamp: $beginCommitTimestamp")

    val hudiOptions = Map(
      "hoodie.datasource.query.type" -> "incremental",
      "hoodie.datasource.read.begin.instanttime" -> beginCommitTimestamp,
      "hoodie.datasource.incremental.read.schema.on.latest.instant" -> "true"
    )

    try {
      val incrementalDF = spark.read.format("hudi")
        .options(hudiOptions)
        .load(hudiTablePath)

      // It is crucial to cache the DataFrame to avoid re-computation.
      incrementalDF.cache()

      val newRecordsCount = incrementalDF.count()
      if (newRecordsCount == 0) {
        logger.info("No new commits found since last run.")
        spark.stop()
        return
      }
      
      logger.info(s"Found $newRecordsCount new or updated records.")
      
      // Find the latest commit time in this batch to use as the next checkpoint
      import org.apache.spark.sql.functions.max
      val latestCommitTimestamp = incrementalDF.select(max("_hoodie_commit_time")).first().getString(0)

      // Publish the changes to SNS
      SnsPublisher.publishChanges(incrementalDF)

      // IMPORTANT: Only update the checkpoint AFTER the publish operation succeeds.
      // If the job fails during publish, the checkpoint is not updated, and the next run
      // will re-process the same commit window, resulting in at-least-once delivery.
      CheckpointManager.updateCommitTimestamp(latestCommitTimestamp)
      
      logger.info("Incremental pipeline run completed successfully.")

    } catch {
      case e: Exception =>
        logger.error("Hudi incremental producer job failed.", e)
        throw e
    } finally {
      spark.stop()
    }
  }
}

A key design decision debated during code review was the composition of the idempotencyKey. The final choice, s"$productId:$commitTime", is deterministic and unique for each version of a record. A simpler key like just productId would prevent re-processing of the same message but would also incorrectly reject a legitimate update to that product from a later Hudi commit.

The Idempotent Meilisearch Consumer

The consumer pulls messages from an SQS queue subscribed to the SNS topic. This SNS-SQS fan-out pattern is standard practice for building resilient systems. It provides a buffer and allows for independent processing and scaling of consumers. We chose to implement the consumer as a long-running Python service deployed on ECS rather than a Lambda function. This avoids Lambda’s timeout limitations and gives us more control over batching, concurrency, and stateful connections to dependencies like Redis.

The consumer’s core challenge is handling message redelivery from SQS without creating duplicates in Meilisearch. This is where the idempotency key becomes critical. We use Redis as a high-performance distributed lock and key-value store to track processed message IDs.

# requirements.txt
# boto3
# meilisearch
# redis
# python-json-logger

The consumer code needs to be robust, handling failures gracefully and ensuring that a message is deleted from SQS only after it has been fully and successfully processed.

import os
import json
import logging
import time
import uuid

import boto3
import meilisearch
import redis
from pythonjsonlogger import jsonlogger

# --- Configuration ---
# In a real app, use a proper config management system.
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
SQS_QUEUE_URL = os.getenv("SQS_QUEUE_URL")
MEILI_HOST = os.getenv("MEILI_HOST", "http://127.0.0.1:7700")
MEILI_MASTER_KEY = os.getenv("MEILI_API_KEY")
MEILI_INDEX_NAME = os.getenv("MEILI_INDEX_NAME", "products")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))

# Idempotency keys will be stored with a TTL to prevent Redis from growing indefinitely.
# 7 days is a safe buffer, much longer than the SQS message retention period.
IDEMPOTENCY_KEY_TTL_SECONDS = 60 * 60 * 24 * 7

# --- Logging Setup ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
    '%(asctime)s %(name)s %(levelname)s %(message)s'
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)

class IdempotencyError(Exception):
    """Custom exception for already processed messages."""
    pass

class IdempotencyManager:
    """Manages tracking of processed messages using Redis."""
    def __init__(self, redis_client):
        self.redis = redis_client

    def check_and_set(self, key: str) -> None:
        """
        Atomically checks if a key exists and sets it if it doesn't.
        Raises IdempotencyError if the key already exists.
        """
        # The 'nx=True' flag ensures this operation is atomic.
        # It sets the key only if it does not exist.
        was_set = self.redis.set(key, "processed", ex=IDEMPOTENCY_KEY_TTL_SECONDS, nx=True)
        if not was_set:
            raise IdempotencyError(f"Message with key '{key}' has already been processed.")

def get_boto_clients():
    """Initializes and returns Boto3 clients."""
    sqs_client = boto3.client("sqs", region_name=AWS_REGION)
    return sqs_client

def get_dependencies():
    """Initializes and returns external service clients."""
    meili_client = meilisearch.Client(MEILI_HOST, MEILI_MASTER_KEY)
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
    try:
        redis_client.ping()
        logger.info("Successfully connected to Redis.")
    except redis.exceptions.ConnectionError as e:
        logger.error("Failed to connect to Redis.", extra={"error": str(e)})
        raise
    return meili_client, IdempotencyManager(redis_client)


def process_message(message: dict, meili_client: meilisearch.Client, idempotency_manager: IdempotencyManager):
    """
    Processes a single SQS message.
    1. Parses the message body.
    2. Checks for idempotency.
    3. Updates Meilisearch.
    """
    try:
        body = json.loads(message["Body"])
        # SNS messages are nested inside the SQS message body
        sns_message = json.loads(body["Message"])

        idempotency_key = sns_message.get("idempotencyKey")
        payload = sns_message.get("payload")
        
        if not idempotency_key or not payload:
            logger.error("Message missing 'idempotencyKey' or 'payload'", extra={"message_id": message["MessageId"]})
            # This is a malformed message, we should not retry it.
            return True # Signal to delete

        # This is the core of exactly-once semantics.
        idempotency_manager.check_and_set(idempotency_key)

        # The payload from Hudi is a flat map. Meilisearch expects a list of documents.
        # We need to ensure the primary key is correctly identified.
        # Hudi's _hoodie_operation can be used for deletes, but here we focus on upserts.
        product_id = payload.get("product_id")
        if not product_id:
            logger.error("Payload missing primary key 'product_id'", extra={"idempotency_key": idempotency_key})
            return True # Malformed, delete

        # Meilisearch's add_or_replace_documents is inherently idempotent at the document level.
        # Our idempotency check prevents re-processing the entire message if the consumer crashes
        # after updating Meilisearch but before deleting from SQS.
        task_info = meili_client.index(MEILI_INDEX_NAME).add_or_replace_documents([payload], primary_key='product_id')
        
        logger.info(
            "Successfully submitted document to Meilisearch.",
            extra={
                "idempotency_key": idempotency_key,
                "meilisearch_task_uid": task_info.task_uid,
                "product_id": product_id
            }
        )
        return True # Signal successful processing

    except IdempotencyError as e:
        logger.warning(str(e), extra={"message_id": message["MessageId"]})
        # This is not an error, it's the system working as designed. The message is a duplicate.
        return True # Signal to delete the duplicate message
    except json.JSONDecodeError as e:
        logger.error("Failed to decode JSON message.", extra={"message_id": message["MessageId"], "error": str(e)})
        return True # Malformed message, delete
    except Exception as e:
        # For any other unexpected error (e.g., Meilisearch is down), we log and do NOT delete the message.
        # SQS visibility timeout will expire, and we will retry processing.
        logger.error(
            "An unexpected error occurred during message processing.",
            extra={"message_id": message["MessageId"], "error": str(e), "error_type": type(e).__name__}
        )
        return False # Signal to NOT delete the message


def main():
    """Main consumer loop."""
    if not all([SQS_QUEUE_URL, MEILI_HOST, MEILI_API_KEY, REDIS_HOST]):
        logger.error("Missing critical environment variables. Exiting.")
        return

    sqs_client = get_boto_clients()
    meili_client, idempotency_manager = get_dependencies()
    
    logger.info(f"Consumer starting. Polling SQS queue: {SQS_QUEUE_URL}")

    while True:
        try:
            response = sqs_client.receive_message(
                QueueUrl=SQS_QUEUE_URL,
                MaxNumberOfMessages=10,  # Process up to 10 messages at a time
                WaitTimeSeconds=20       # Use long polling
            )

            messages = response.get("Messages", [])
            if not messages:
                continue
            
            logger.info(f"Received {len(messages)} messages from SQS.")

            processed_handles = []
            for message in messages:
                # If processing succeeds (or it's a confirmed duplicate), we delete it.
                if process_message(message, meili_client, idempotency_manager):
                    processed_handles.append({
                        'Id': str(uuid.uuid4()), # Batch delete entry ID
                        'ReceiptHandle': message['ReceiptHandle']
                    })

            if processed_handles:
                sqs_client.delete_message_batch(
                    QueueUrl=SQS_QUEUE_URL,
                    Entries=processed_handles
                )
                logger.info(f"Deleted {len(processed_handles)} successfully processed messages from SQS.")

        except Exception as e:
            # This catches errors in the main loop (e.g., SQS client connection issues).
            logger.error("Error in main consumer loop. Retrying in 10 seconds.", extra={"error": str(e)})
            time.sleep(10)

if __name__ == "__main__":
    main()

Limitations and Future Considerations

This architecture achieves a resilient, near real-time data pipeline with effective exactly-once semantics. However, it’s not without its own complexities and trade-offs. The “near real-time” latency is fundamentally limited by the execution frequency of the Spark producer job. Decreasing the interval from five minutes to one minute increases EMR and DynamoDB costs and may not be feasible for very large data volumes. The idempotency store, Redis, introduces another piece of critical infrastructure that must be maintained, monitored, and scaled. A failure in Redis will halt the processing of new messages.

Furthermore, this implementation does not explicitly handle out-of-order messages. A standard SQS queue is used, so if two updates to the same product occur in two different Hudi commits (C1 then C2), it’s possible for the message from C2 to be processed before the one from C1, leading to temporarily incorrect data until C1‘s message is processed. For our use case, where updates are frequent and eventually consistent data is acceptable, this was a reasonable trade-off. For scenarios requiring strict ordering, migrating to an SQS FIFO queue would be necessary, which in turn would require changes in the Spark producer to assign appropriate Message Group IDs, adding another layer of complexity. Finally, handling hard deletes from Hudi requires a separate logical path; the producer would need to identify delete operations and publish a specific event type, which the consumer would then translate into a delete_documents call in Meilisearch.


  TOC