Implementing a Kafka-Triggered MapReduce Framework for Batch Data Aggregation into MariaDB


Our primary reporting database, a heavily indexed MariaDB instance, was buckling under the load of nightly aggregation jobs. These jobs, monolithic SQL scripts running for hours, were becoming untenable. The business, under a new Scrum-driven initiative for faster insights, wanted near-real-time data, but our infrastructure couldn’t support it. We already had a robust Kafka pipeline for event ingestion from other services, but no budget for a full-fledged Spark or Hadoop cluster. The challenge, framed in our first sprint planning session, was stark: offload the aggregation logic from MariaDB using our existing Kafka infrastructure and a lean compute model, delivering results back to a separate reporting table in MariaDB. This is the log of how we built a bespoke, lightweight MapReduce implementation to bridge that gap.

Sprint 1: The Kafka Batch Controller - Forcing a Stream to Act Like a Batch

The first technical hurdle was reconciling Kafka’s streaming nature with the batch-oriented processing of MapReduce. A MapReduce job needs a well-defined, finite dataset. Our solution was a “Batch Controller” built on top of a standard Kafka consumer. Its sole purpose is to consume messages from a topic, collect them into a discrete batch, and then trigger the Map phase. In a real-world project, control over batch semantics is paramount.

The controller’s logic is simple but critical:

  1. Poll the Kafka topic continuously.
  2. Accumulate messages in-memory.
  3. Seal the batch when one of two conditions is met: either a configurable message count (MAX_BATCH_SIZE) is reached, or a time threshold (MAX_BATCH_INTERVAL_MS) is exceeded. This prevents indefinite waiting on low-traffic topics.
  4. Once a batch is sealed, write all messages to a uniquely named file on the local disk. This file becomes the input for the Map phase.
  5. Critically, commit the Kafka offsets only after the batch file has been successfully written to disk. This provides at-least-once delivery semantics at the batch level. If the controller crashes after writing the file but before committing, it will re-process the messages on restart, but our downstream logic must handle this.
// src/main/java/com/datapipe/controller/BatchController.java
package com.datapipe.controller;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BatchController {

    private static final Logger logger = LoggerFactory.getLogger(BatchController.class);
    private final KafkaConsumer<String, String> consumer;
    private final String inputTopic;
    private final String batchOutputPath;
    private final int maxBatchSize;
    private final long maxBatchIntervalMs;
    private final ExecutorService mapExecutor;

    public BatchController(Properties kafkaProps, String inputTopic, String batchOutputPath, int maxBatchSize, long maxBatchIntervalMs, int mapperThreads) {
        this.consumer = new KafkaConsumer<>(kafkaProps);
        this.inputTopic = inputTopic;
        this.batchOutputPath = batchOutputPath;
        this.maxBatchSize = maxBatchSize;
        this.maxBatchIntervalMs = maxBatchIntervalMs;
        this.mapExecutor = Executors.newFixedThreadPool(mapperThreads);
    }

    public void start() {
        consumer.subscribe(Collections.singletonList(inputTopic));
        logger.info("BatchController started. Subscribed to topic: {}", inputTopic);

        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        long lastBatchTime = System.currentTimeMillis();

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                }

                boolean batchSizeReached = buffer.size() >= maxBatchSize;
                boolean timeThresholdReached = System.currentTimeMillis() - lastBatchTime >= maxBatchIntervalMs;

                if (!buffer.isEmpty() && (batchSizeReached || timeThresholdReached)) {
                    logger.info("Sealing batch. Reason: {}. Size: {}.",
                        batchSizeReached ? "Max size reached" : "Time threshold reached", buffer.size());

                    processBatch(new ArrayList<>(buffer)); // Process a copy
                    
                    consumer.commitSync(); // Commit offsets only after successful processing initiation
                    logger.info("Kafka offsets committed successfully.");
                    
                    buffer.clear();
                    lastBatchTime = System.currentTimeMillis();
                }
            }
        } catch (Exception e) {
            logger.error("An unrecoverable error occurred in the BatchController.", e);
        } finally {
            consumer.close();
            mapExecutor.shutdown();
        }
    }

    private void processBatch(List<ConsumerRecord<String, String>> batch) {
        String batchId = "batch-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);
        String batchFilePath = batchOutputPath + "/" + batchId + ".data";

        try (BufferedWriter writer = new BufferedWriter(new FileWriter(batchFilePath))) {
            for (ConsumerRecord<String, String> record : batch) {
                // We write the value directly, assuming it's a single line of data (e.g., JSON)
                writer.write(record.value());
                writer.newLine();
            }
            logger.info("Batch {} successfully written to disk at {}", batchId, batchFilePath);
            
            // In a real implementation, this would trigger a more complex MapReduce job orchestrator.
            // For this example, we'll directly kick off a mapper task.
            // mapExecutor.submit(new WordCountMapper(batchFilePath, ...));

        } catch (IOException e) {
            logger.error("Failed to write batch {} to disk. Offsets will not be committed. This batch will be re-processed.", batchId, e);
            // By not catching this and letting it bubble up, we prevent the offset commit.
            // This is a crucial aspect of ensuring data isn't lost.
            throw new RuntimeException("Failed to persist batch " + batchId, e);
        }
    }

    public static void main(String[] args) {
        // Production-grade configuration should be externalized (e.g., YAML, properties file)
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "mapreduce-batch-controller-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Manual commit is essential

        BatchController controller = new BatchController(
            props,
            "raw-logs-topic",
            "/tmp/mapreduce/input",
            10000,          // Batch size of 10,000 messages
            60000,          // Or batch every 60 seconds
            4               // Number of mapper threads
        );
        controller.start();
    }
}

A common mistake here is to use Kafka’s auto-commit feature. It’s a recipe for data loss in this kind of system. If the application crashes after the messages are consumed but before the batch is durably stored, those offsets might have been committed, and the data is gone forever. Manual commit is non-negotiable.

Sprint 2: The Mapper and Local File Persistence

With a reliable way to create batch files, the next sprint focused on the “Map” phase. We defined a simple interface that our data processing logic would implement. The goal was to keep it stateless and focused on a single task: read input line-by-line from the batch file and write intermediate key-value pairs to another local file.

We consciously avoided a distributed filesystem like HDFS. The pragmatic choice for our scale was to use the local disk of the worker node. This introduced a single point of failure but drastically simplified the architecture and operational overhead.

// src/main/java/com/datapipe/mapreduce/Mapper.java
package com.datapipe.mapreduce;

import java.io.IOException;

// A simple key-value pair structure for intermediate data
class IntermediateKV {
    String key;
    String value;

    public IntermediateKV(String key, String value) {
        this.key = key;
        this.value = value;
    }

    @Override
    public String toString() {
        // Using a tab as a delimiter is a classic MapReduce convention
        return key + "\t" + value;
    }
}

// The context object provides a way for the mapper to output its results.
interface MapperContext {
    void write(IntermediateKV kv) throws IOException;
}

// The generic Mapper interface
public interface Mapper {
    void map(String line, MapperContext context) throws IOException;
}

Here is a concrete implementation for a simple “URL hit count” analysis, a simplified version of our actual business logic.

// src/main/java/com/datapipe/mappers/UrlHitCountMapper.java
package com.datapipe.mappers;

import com.datapipe.mapreduce.IntermediateKV;
import com.datapipe.mapreduce.Mapper;
import com.datapipe.mapreduce.MapperContext;

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// Example Mapper: Parses web server log lines and extracts the request URL.
public class UrlHitCountMapper implements Mapper {

    // A fragile regex for demonstration. Production code would use a robust JSON/log parsing library.
    private static final Pattern LOG_PATTERN = Pattern.compile("^\\S+ \\S+ \\S+ \\[.+\\] \"\\S+ (\\S+) \\S+\" \\d{3} \\d+$");

    @Override
    public void map(String line, MapperContext context) throws IOException {
        if (line == null || line.trim().isEmpty()) {
            return; // Basic input validation
        }

        Matcher matcher = LOG_PATTERN.matcher(line);
        if (matcher.find()) {
            String url = matcher.group(1);
            // The key is the URL, the value is simply "1" to represent a single hit.
            context.write(new IntermediateKV(url, "1"));
        }
    }
}

The runner for the Map phase reads the input batch file line by line, invokes the map method, and writes the output to a dedicated intermediate file.

Sprint 3: The Shuffle and Sort - The Pragmatic Bottleneck

This was the most contentious phase. A true distributed shuffle is complex. It involves partitioning, network transfer, and merging sorted runs of data. Our Scrum master rightly questioned if we could deliver a “distributed” shuffle in one sprint. We couldn’t.

Instead, we implemented a centralized, file-based shuffle-and-sort. After all mapper tasks for a given batch complete, a single process is responsible for:

  1. Gathering all intermediate output files from the mappers.
  2. Concatenating them into one large file.
  3. Using the standard Unix sort utility to sort this file. The pitfall here is performance and resource usage. sort can be memory and I/O intensive, but it’s incredibly robust and optimized. For our data scale (batches of a few million records), it was a perfectly acceptable trade-off over writing a custom, likely less efficient, external sort in Java.
graph TD
    subgraph Map Phase
        M1[Mapper 1] -- writes --> F1[inter_m1.txt]
        M2[Mapper 2] -- writes --> F2[inter_m2.txt]
        M3[Mapper 3] -- writes --> F3[inter_m3.txt]
    end

    subgraph Shuffle & Sort Phase
        F1 --> S[Shuffle Process]
        F2 --> S
        F3 --> S
        S -- 1. `cat inter_*.txt` --> C[Combined File]
        C -- 2. `sort -k1,1` --> SF[Sorted File]
    end

    subgraph Reduce Phase
        SF -- feeds --> R[Reducer]
    end

The orchestration logic looked something like this simple shell script, which our Java application would invoke.

#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.

BATCH_ID=$1
MAP_OUTPUT_DIR="/tmp/mapreduce/map_output/${BATCH_ID}"
SHUFFLE_OUTPUT_DIR="/tmp/mapreduce/shuffle_output"
COMBINED_FILE="${SHUFFLE_OUTPUT_DIR}/${BATCH_ID}.combined"
SORTED_FILE="${SHUFFLE_OUTPUT_DIR}/${BATCH_ID}.sorted"

# Ensure output directory exists
mkdir -p ${SHUFFLE_OUTPUT_DIR}

echo "Starting shuffle for batch ${BATCH_ID}"

# 1. Concatenate all mapper outputs
echo "Concatenating mapper outputs..."
cat ${MAP_OUTPUT_DIR}/map-*.out > ${COMBINED_FILE}

# 2. Sort the combined file by the first field (the key)
echo "Sorting combined file..."
# The -T option specifies a temporary directory, crucial for large sorts
sort -T /tmp -k1,1 ${COMBINED_FILE} -o ${SORTED_FILE}

# 3. Clean up intermediate files
rm ${COMBINED_FILE}
rm -rf ${MAP_OUTPUT_DIR}

echo "Shuffle and sort complete for batch ${BATCH_ID}. Output at ${SORTED_FILE}"

This approach is a clear technical debt. It’s not scalable and makes the filesystem a bottleneck. However, under the Scrum philosophy of delivering value incrementally, it was the right decision. It got us a working end-to-end pipeline to demonstrate at the sprint review, with a well-defined PBI (Product Backlog Item) to improve it later.

Sprint 4 & 5: The Reducer and Idempotent MariaDB Writes

The Reducer’s job is to process the sorted intermediate file. The framework reads the file and, for each unique key, provides the reducer with an iterator over all its associated values.

// src/main/java/com/datapipe/mapreduce/Reducer.java
package com.datapipe.mapreduce;

import java.io.IOException;
import java.util.Iterator;

// Context for the reducer, primarily for writing final output.
interface ReducerContext {
    void write(String key, String value) throws IOException;
}

// The generic Reducer interface
public interface Reducer {
    void reduce(String key, Iterator<String> values, ReducerContext context) throws IOException;
}

// src/main/java/com/datapipe/reducers/SummingReducer.java
package com.datapipe.reducers;

import com.datapipe.mapreduce.Reducer;
import com.datapipe.mapreduce.ReducerContext;

import java.io.IOException;
import java.util.Iterator;

public class SummingReducer implements Reducer {
    @Override
    public void reduce(String key, Iterator<String> values, ReducerContext context) throws IOException {
        long sum = 0;
        while (values.hasNext()) {
            try {
                // In our case, the value is always "1".
                sum += Long.parseLong(values.next());
            } catch (NumberFormatException e) {
                // Log and ignore malformed values in a real system
            }
        }
        context.write(key, String.valueOf(sum));
    }
}

The final step is persisting the aggregated results to MariaDB. This is where reliability becomes critical. A MapReduce job might fail and be retried. We must ensure that re-running a reducer doesn’t corrupt our data. The key is to design the database interaction to be idempotent.

The table in MariaDB was designed with a unique key on the dimension we were aggregating (in this case, the URL). We then used MariaDB’s INSERT ... ON DUPLICATE KEY UPDATE syntax. This elegantly handles both new entries and updates to existing ones within a single atomic statement.

CREATE TABLE `url_daily_hits` (
  `report_date` date NOT NULL,
  `url_path` varchar(2048) NOT NULL,
  `hit_count` bigint(20) unsigned NOT NULL,
  `last_updated` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
  PRIMARY KEY (`report_date`, `url_path`(255)) -- Composite primary key for idempotency
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

The composite primary key is (report_date, url_path). When our reducer writes its output, it uses a query like this:

// src/main/java/com/datapipe/storage/MariaDbWriter.java
package com.datapipe.storage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.time.LocalDate;
import java.util.List;
import java.util.Properties;

public class MariaDbWriter {
    private static final Logger logger = LoggerFactory.getLogger(MariaDbWriter.class);
    private final String jdbcUrl;
    private final Properties dbProps;

    private static final String UPSERT_SQL =
        "INSERT INTO url_daily_hits (report_date, url_path, hit_count) " +
        "VALUES (?, ?, ?) " +
        "ON DUPLICATE KEY UPDATE hit_count = hit_count + VALUES(hit_count);";

    public MariaDbWriter(String host, int port, String database, String user, String password) {
        this.jdbcUrl = String.format("jdbc:mariadb://%s:%d/%s", host, port, database);
        this.dbProps = new Properties();
        this.dbProps.put("user", user);
        this.dbProps.put("password", password);
    }
    
    // A record to hold the final output
    public record AggregationResult(String key, long value) {}

    public void writeBatch(List<AggregationResult> results, LocalDate reportDate) {
        if (results == null || results.isEmpty()) {
            return;
        }

        try (Connection conn = DriverManager.getConnection(jdbcUrl, dbProps);
             PreparedStatement pstmt = conn.prepareStatement(UPSERT_SQL)) {
            
            conn.setAutoCommit(false); // Enable transaction control

            int batchCount = 0;
            for (AggregationResult result : results) {
                pstmt.setDate(1, Date.valueOf(reportDate));
                pstmt.setString(2, result.key());
                pstmt.setLong(3, result.value());
                pstmt.addBatch();

                if (++batchCount % 1000 == 0) {
                    pstmt.executeBatch();
                    logger.debug("Executed a batch of 1000 statements.");
                }
            }
            pstmt.executeBatch(); // Execute remaining batch
            
            conn.commit(); // Commit the transaction
            logger.info("Successfully wrote {} records to MariaDB.", results.size());

        } catch (SQLException e) {
            logger.error("Error writing batch to MariaDB. The transaction will be rolled back.", e);
            // This exception should trigger a retry mechanism for the entire Reduce job.
            throw new RuntimeException("Failed to persist reducer output", e);
        }
    }
}

The use of hit_count = hit_count + VALUES(hit_count) is crucial. It ensures that if a partial batch from a failed job is re-processed, the counts are additive and eventually consistent, rather than overwriting a previous value with a smaller one. Combining this with JDBC batching and transaction control provides both performance and correctness.

This entire architecture, born from constraints, delivered significant value. It successfully offloaded the aggregation workload from our primary database, enabling more frequent reporting. The Scrum process was instrumental, forcing us to break down a complex problem into manageable, value-delivering chunks, even if some of the initial solutions were imperfect.

The current system is not without its limitations. The centralized file-based shuffle is a known scalability bottleneck and a single point of failure. The job orchestration is simplistic and lacks robust failure recovery for the controller process itself; a crash mid-batch requires manual intervention to clean up file artifacts and ensure the consumer group rebalances correctly. Future iterations, already on the backlog, involve replacing the shell-based shuffle with a more robust intermediate data store like Redis or a dedicated object store, and building a more resilient state machine for job orchestration.


  TOC