Implementing a Kubernetes Operator for Declarative MapReduce-Based Indexing


The operational burden of our log search platform had become untenable. We were ingesting terabytes of unstructured application logs daily, dumping them into an S3 bucket. The indexing process, critical for making this data searchable, was a collection of brittle, manually-triggered shell scripts wrapping kubectl. Each script launched a fleet of pods to download, parse, and index the data. When a pod failed—due to transient network issues, malformed log lines, or resource contention—the entire batch job would halt. The on-call engineer would then have to manually inspect logs, clean up orphaned pods and partial index files, adjust parameters, and restart the whole multi-hour process. This wasn’t DevOps; it was high-stakes digital plumbing.

Our objective shifted from merely running the indexing jobs to building a self-managing system that could orchestrate them. We needed a declarative API. An engineer should be able to define the desired state of an index—its source data, its schema, its retention policy—and a controller should be responsible for making that state a reality. This is the core promise of the Kubernetes operator pattern. We decided to build one.

The choice of MapReduce for the indexing logic itself was a pragmatic one. While newer frameworks like Spark or Flink are more powerful, they introduce significant cluster management overhead. Our problem was embarrassingly parallel: split a massive collection of log files (Map), process each chunk independently to create inverted index fragments, and then merge those fragments into final queryable shards (Reduce). A simple, container-based MapReduce implementation running as ephemeral Kubernetes Jobs was sufficient and, critically, stateless from the perspective of the cluster scheduler.

The foundation of our operator is the Custom Resource Definition (CRD). This defines the schema for our new API object, the LogIndex. It captures the user’s intent.

// api/v1alpha1/logindex_types.go

package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// S3SourceSpec defines the location of the source log files in S3.
type S3SourceSpec struct {
	// +kubebuilder:validation:Required
	Bucket string `json:"bucket"`
	// +kubebuilder:validation:Required
	Prefix string `json:"prefix"`
	// +kubebuilder:validation:Optional
	Region string `json:"region,omitempty"`
}

// SourceSpec defines the data source for the index.
type SourceSpec struct {
	// +kubebuilder:validation:Required
	S3 S3SourceSpec `json:"s3"`
}

// OutputSpec defines where the final index artifacts should be stored.
type OutputSpec struct {
	// +kubebuilder:validation:Required
	IndexName string `json:"indexName"`
	// +kubebuilder:validation:Required
	Target S3SourceSpec `json:"target"`
}

// MapReduceSpec defines the resource allocation for map and reduce jobs.
type MapReduceSpec struct {
    // Number of parallel map jobs to run.
	// +kubebuilder:validation:Minimum=1
	// +kubebuilder:default=5
	MapJobs int32 `json:"mapJobs"`

    // Number of parallel reduce jobs to run.
	// +kubebuilder:validation:Minimum=1
	// +kubebuilder:default=2
	ReduceJobs int32 `json:"reduceJobs"`
}

// LogIndexSpec defines the desired state of LogIndex
type LogIndexSpec struct {
	// +kubebuilder:validation:Required
	Source SourceSpec `json:"source"`
	// +kubebuilder:validation:Required
	Output OutputSpec `json:"output"`
	// +kubebuilder:validation:Required
	JobSpec MapReduceSpec `json:"jobSpec"`
}

// Phase indicates the current lifecycle stage of the LogIndex process.
type Phase string

const (
	PhasePending   Phase = "Pending"
	PhasePreparing Phase = "Preparing"
	PhaseMapping   Phase = "Mapping"
	PhaseReducing  Phase = "Reducing"
	PhaseComplete  Phase = "Complete"
	PhaseFailed    Phase = "Failed"
)

// LogIndexStatus defines the observed state of LogIndex
type LogIndexStatus struct {
	Phase      Phase                `json:"phase,omitempty"`
	Conditions []metav1.Condition   `json:"conditions,omitempty"`
	StartTime  *metav1.Time         `json:"startTime,omitempty"`
	CompletionTime *metav1.Time         `json:"completionTime,omitempty"`
	// A reference to the ConfigMap holding the file list for mappers.
	FileListConfigMap string               `json:"fileListConfigMap,omitempty"`
	// Metrics about the indexing process.
	FilesIndexed int64 `json:"filesIndexed,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="The current status of the LogIndex"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// LogIndex is the Schema for the logindices API
type LogIndex struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   LogIndexSpec   `json:"spec,omitempty"`
	Status LogIndexStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// LogIndexList contains a list of LogIndex
type LogIndexList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []LogIndex `json:"items"`
}

func init() {
	SchemeBuilder.Register(&LogIndex{}, &LogIndexList{})
}

A critical design decision here is the separation of spec and status. The spec is the user’s desired state and must be idempotent. The status is the controller’s view of the world, which it continuously tries to reconcile with the spec. A common mistake is to have the controller modify the spec, which breaks the declarative model. The +kubebuilder:subresource:status annotation ensures that status updates are a separate API endpoint, preventing accidental spec modifications.

The heart of the operator is the Reconcile method. This control loop is triggered by any change to a LogIndex resource or any of its owned resources (like the Jobs it creates).

// internal/controller/logindex_controller.go

func (r *LogIndexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := log.FromContext(ctx)

	var logIndex v1alpha1.LogIndex
	if err := r.Get(ctx, req.NamespacedName, &logIndex); err != nil {
		if errors.IsNotFound(err) {
			// Object was deleted, nothing to do.
			return ctrl.Result{}, nil
		}
		log.Error(err, "unable to fetch LogIndex")
		return ctrl.Result{}, err
	}

	// Main reconciliation logic is a state machine based on the status phase.
	switch logIndex.Status.Phase {
	case "":
		return r.reconcilePending(ctx, &logIndex)
	case v1alpha1.PhasePending:
		return r.reconcilePending(ctx, &logIndex)
	case v1alpha1.PhasePreparing:
		return r.reconcilePreparing(ctx, &logIndex)
	case v1alpha1.PhaseMapping:
		return r.reconcileMapping(ctx, &logIndex)
	case v1alpha1.PhaseReducing:
		return r.reconcileReducing(ctx, &logIndex)
	case v1alpha1.PhaseComplete, v1alpha1.PhaseFailed:
		// Terminal states, nothing to do.
		return ctrl.Result{}, nil
	default:
		log.Info("Unknown phase, ignoring", "phase", logIndex.Status.Phase)
		return ctrl.Result{}, nil
	}
}

The first real state is Pending. Here, the controller needs to initialize the process.

// internal/controller/phases.go

func (r *LogIndexReconciler) reconcilePending(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
	log := log.FromContext(ctx)
	log.Info("Reconciling Pending phase")

	// Update status to mark the beginning of the process.
	logIndex.Status.Phase = v1alpha1.PhasePreparing
	logIndex.Status.StartTime = &metav1.Time{Time: time.Now()}
	
    // We use conditions for detailed status reporting
    meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
		Type:    "Progressing",
		Status:  metav1.ConditionTrue,
		Reason:  "PreparationStarted",
		Message: "Starting to prepare for MapReduce jobs",
	})

	if err := r.Status().Update(ctx, logIndex); err != nil {
		log.Error(err, "failed to update LogIndex status")
		return ctrl.Result{}, err
	}

	// Requeue immediately to enter the next state.
	return ctrl.Result{Requeue: true}, nil
}

The Preparing phase involves talking to an external service (S3) to discover the files to be processed. This is a potential point of failure. The pitfall here is doing this work synchronously within the reconcile loop every time. If listing millions of files takes a long time, it can block the operator. Our approach is to perform this discovery once, store the results in a ConfigMap, and reference it in the status.

// internal/controller/phases.go

func (r *LogIndexReconciler) reconcilePreparing(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
	log := log.FromContext(ctx)
	log.Info("Reconciling Preparing phase")

	// 1. Create S3 client (configuration via env vars or secrets)
	// s3Client := s3.New(...)

	// 2. List objects from S3. This code is simplified.
	// In a real-world project, this needs robust pagination and error handling.
	// objects, err := listAllS3Objects(ctx, s3Client, logIndex.Spec.Source.S3.Bucket, logIndex.Spec.Source.S3.Prefix)
	// if err != nil {
	//	   log.Error(err, "failed to list S3 objects")
	//     // Set failed status and stop.
	//     return r.updateStatusToFailed(ctx, logIndex, "S3ListFailed", err.Error())
	// }

	// For demonstration, let's assume we got a list of files.
	objects := []string{"log-part-001.gz", "log-part-002.gz", "log-part-003.gz", "log-part-004.gz", "log-part-005.gz"}

	// 3. Split the file list for the mappers
	filePartitions := splitFiles(objects, int(logIndex.Spec.JobSpec.MapJobs))
	
	// 4. Create a ConfigMap to hold these partitions.
	// Using a ConfigMap avoids stuffing a potentially huge file list into the LogIndex status,
	// which is backed by etcd and has size limits.
	cm := r.buildPartitionsConfigMap(logIndex, filePartitions)
	if err := ctrl.SetControllerReference(logIndex, cm, r.Scheme); err != nil {
		return ctrl.Result{}, err
	}
	
	err := r.Create(ctx, cm)
    if err != nil && !errors.IsAlreadyExists(err) {
		log.Error(err, "failed to create partitions ConfigMap")
		return ctrl.Result{}, err
	}

	// 5. Update status
	logIndex.Status.Phase = v1alpha1.PhaseMapping
	logIndex.Status.FileListConfigMap = cm.Name
	logIndex.Status.FilesIndexed = int64(len(objects))
	meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
		Type:    "Progressing",
		Status:  metav1.ConditionTrue,
		Reason:  "MapJobsCreation",
		Message: "File list prepared, creating Map jobs",
	})
	if err := r.Status().Update(ctx, logIndex); err != nil {
		return ctrl.Result{}, err
	}

	return ctrl.Result{Requeue: true}, nil
}

The ctrl.SetControllerReference call is non-negotiable. It establishes an ownership link. When the LogIndex custom resource is deleted, Kubernetes garbage collection will automatically delete the ConfigMap and any Jobs we create. Without it, we’d leak resources across the cluster.

Now we enter the Mapping phase. The controller’s job is to create the Map jobs and then monitor their progress.

// internal/controller/phases.go

func (r *LogIndexReconciler) reconcileMapping(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
	log := log.FromContext(ctx)
	log.Info("Reconciling Mapping phase")

	// Create map jobs if they don't exist
	for i := 0; i < int(logIndex.Spec.JobSpec.MapJobs); i++ {
		jobName := fmt.Sprintf("%s-map-%d", logIndex.Name, i)
		var job batchv1.Job
		err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: logIndex.Namespace}, &job)
		if err != nil && errors.IsNotFound(err) {
			log.Info("Creating Map job", "jobName", jobName)
			mapJob := r.buildMapJob(logIndex, i)
			if err := ctrl.SetControllerReference(logIndex, mapJob, r.Scheme); err != nil {
				return ctrl.Result{}, err
			}
			if err := r.Create(ctx, mapJob); err != nil {
				log.Error(err, "Failed to create Map job", "jobName", jobName)
				return ctrl.Result{}, err
			}
		} else if err != nil {
			return ctrl.Result{}, err
		}
	}

	// Check status of map jobs
	var childJobs batchv1.JobList
	if err := r.List(ctx, &childJobs, client.InNamespace(logIndex.Namespace), client.MatchingLabels{"logindex.my.domain/job-type": "map"}); err != nil {
		return ctrl.Result{}, err
	}

	succeeded := 0
	failed := 0
	for _, job := range childJobs.Items {
		isOwner := false
		for _, owner := range job.GetOwnerReferences() {
			if owner.UID == logIndex.UID {
				isOwner = true
				break
			}
		}
		if !isOwner {
			continue // Don't count jobs from other LogIndex instances
		}

		if job.Status.Succeeded > 0 {
			succeeded++
		} else if job.Status.Failed > 0 {
			// A single map job failure fails the entire operation.
			// A more resilient strategy could involve retries, but for simplicity we fail fast.
			return r.updateStatusToFailed(ctx, logIndex, "MapJobFailed", fmt.Sprintf("Job %s failed", job.Name))
		}
	}

	if succeeded == int(logIndex.Spec.JobSpec.MapJobs) {
		log.Info("All map jobs completed successfully")
		logIndex.Status.Phase = v1alpha1.PhaseReducing
		meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
			Type:    "Progressing",
			Status:  metav1.ConditionTrue,
			Reason:  "ReduceJobsCreation",
			Message: "Map phase complete, creating Reduce jobs",
		})
		if err := r.Status().Update(ctx, logIndex); err != nil {
			return ctrl.Result{}, err
		}
		return ctrl.Result{Requeue: true}, nil
	}

	// Not all jobs finished, requeue after a short delay to check again.
	return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}

// buildMapJob creates the Job object for a mapper.
func (r *LogIndexReconciler) buildMapJob(logIndex *v1alpha1.LogIndex, partitionIndex int) *batchv1.Job {
	// ... Kubernetes Job spec construction ...
	job := &batchv1.Job{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("%s-map-%d", logIndex.Name, partitionIndex),
			Namespace: logIndex.Namespace,
			Labels: map[string]string{
				"logindex.my.domain/job-type": "map",
				"logindex.my.domain/name":     logIndex.Name,
			},
		},
		Spec: batchv1.JobSpec{
			Template: corev1.PodTemplateSpec{
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{{
						Name:  "mapper",
						Image: "my-registry/log-indexer:v1.0.0", // Our application image
						Args:  []string{"map"},
						Env: []corev1.EnvVar{
							{Name: "PARTITION_INDEX", Value: strconv.Itoa(partitionIndex)},
							{Name: "CONFIG_MAP_NAME", Value: fmt.Sprintf("%s-partitions", logIndex.Name)},
						},
						VolumeMounts: []corev1.VolumeMount{
							// Mount for intermediate data
						},
					}},
					RestartPolicy: corev1.RestartPolicyNever,
				},
			},
			BackoffLimit: ptr.To[int32](2), // Retry failed pods twice before failing the job
		},
	}
	// ... Volume definitions to mount the ConfigMap and shared storage for intermediate results ...
	return job
}

The mapper container itself is a Go application. Its main function checks the map argument and runs the mapping logic.

// cmd/indexer/main.go (runs inside the container)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
)

func main() {
	if len(os.Args) < 2 {
		log.Fatal("expected 'map' or 'reduce' subcommand")
	}

	switch os.Args[1] {
	case "map":
		runMapper()
	case "reduce":
		runReducer()
	default:
		log.Fatal("unknown subcommand")
	}
}

func runMapper() {
	partitionIndex := os.Getenv("PARTITION_INDEX")
	configMapName := os.Getenv("CONFIG_MAP_NAME")
	
    // 1. In-cluster Kubernetes client setup to read the ConfigMap
	// config, err := rest.InClusterConfig()
	// clientset, err := kubernetes.NewForConfig(config)
	
	// 2. Read the assigned file list from the ConfigMap's data field,
	// using the partitionIndex as the key.
	// cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), configMapName, metav1.GetOptions{})
	// fileListJson := cm.Data[fmt.Sprintf("partition-%s", partitionIndex)]
	// var fileList []string
	// json.Unmarshal([]byte(fileListJson), &fileList)

	log.Printf("Mapper for partition %s starting process for %d files.", partitionIndex, 10 /* len(fileList) */)

	// 3. For each file in the list:
	//    a. Download from S3.
	//    b. Decompress if needed.
	//    c. Parse lines, tokenize words, and generate (term, docID) pairs.
	//    d. Write intermediate key-value pairs to a shared volume, e.g., /mnt/shuffle/map-output-<partitionIndex>.tmp
	
	log.Printf("Mapper for partition %s finished.", partitionIndex)
}

The logic for reconcileReducing is structurally identical to reconcileMapping: create reducer Jobs, monitor their status, and if they all succeed, transition to the final Complete state. The reducer pod’s application logic reads all the intermediate files from the shared volume, groups them by term, merges the document postings, and writes the final index shards to their destination.

Upon successful completion of the reduce phase, the controller performs its final act.

// internal/controller/phases.go

func (r *LogIndexReconciler) reconcileComplete(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
    log := log.FromContext(ctx)
	log.Info("Reconciling Complete phase")

    // The process is finished, but we need to do some housekeeping.
    // Clean up the ConfigMap that held the file partitions.
    cmName := logIndex.Status.FileListConfigMap
    if cmName != "" {
        var cm corev1.ConfigMap
        err := r.Get(ctx, types.NamespacedName{Name: cmName, Namespace: logIndex.Namespace}, &cm)
        if err == nil {
            if err := r.Delete(ctx, &cm); err != nil {
                log.Error(err, "Failed to delete partitions ConfigMap")
                // Don't block completion for cleanup failure, just log it.
            }
        }
    }

    logIndex.Status.Phase = v1alpha1.PhaseComplete
    now := metav1.Now()
	logIndex.Status.CompletionTime = &now
	meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
		Type:    "Progressing",
		Status:  metav1.ConditionFalse,
		Reason:  "IndexingComplete",
		Message: "MapReduce pipeline finished successfully",
	})
    meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
		Type:    "Available",
		Status:  metav1.ConditionTrue,
		Reason:  "IndexingComplete",
		Message: "Index is ready for querying",
	})

    if err := r.Status().Update(ctx, logIndex); err != nil {
        return ctrl.Result{}, err
    }

    log.Info("LogIndex successfully reconciled")
	return ctrl.Result{}, nil // Terminal state, no requeue.
}

By building this operator, we transformed our indexing process from a manual, error-prone task into a fully automated, declarative, and observable system native to our Kubernetes platform. An engineer now simply defines a LogIndex resource in YAML and commits it to Git. ArgoCD picks it up, applies it to the cluster, and our operator handles the rest.

This implementation, however, is not without its limitations. The use of a shared filesystem volume (like NFS) for the shuffle phase between Map and Reduce can become a performance bottleneck and is a single point of failure. A more robust architecture might leverage a distributed key-value store or an in-memory data grid for this intermediate data exchange. Furthermore, the operator currently triggers a full re-index of the source data every time. A future enhancement would be to implement incremental indexing, where the operator checksums the source files and only processes new or modified data since the last successful run, dramatically reducing processing time and cost for subsequent updates. Finally, observability could be improved by having the operator scrape metrics from the job pods (e.g., records processed, bytes read) and expose them as Prometheus metrics for finer-grained monitoring of the pipeline’s health and performance.


  TOC