The operational burden of our log search platform had become untenable. We were ingesting terabytes of unstructured application logs daily, dumping them into an S3 bucket. The indexing process, critical for making this data searchable, was a collection of brittle, manually-triggered shell scripts wrapping kubectl
. Each script launched a fleet of pods to download, parse, and index the data. When a pod failed—due to transient network issues, malformed log lines, or resource contention—the entire batch job would halt. The on-call engineer would then have to manually inspect logs, clean up orphaned pods and partial index files, adjust parameters, and restart the whole multi-hour process. This wasn’t DevOps; it was high-stakes digital plumbing.
Our objective shifted from merely running the indexing jobs to building a self-managing system that could orchestrate them. We needed a declarative API. An engineer should be able to define the desired state of an index—its source data, its schema, its retention policy—and a controller should be responsible for making that state a reality. This is the core promise of the Kubernetes operator pattern. We decided to build one.
The choice of MapReduce for the indexing logic itself was a pragmatic one. While newer frameworks like Spark or Flink are more powerful, they introduce significant cluster management overhead. Our problem was embarrassingly parallel: split a massive collection of log files (Map), process each chunk independently to create inverted index fragments, and then merge those fragments into final queryable shards (Reduce). A simple, container-based MapReduce implementation running as ephemeral Kubernetes Jobs was sufficient and, critically, stateless from the perspective of the cluster scheduler.
The foundation of our operator is the Custom Resource Definition (CRD). This defines the schema for our new API object, the LogIndex
. It captures the user’s intent.
// api/v1alpha1/logindex_types.go
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// S3SourceSpec defines the location of the source log files in S3.
type S3SourceSpec struct {
// +kubebuilder:validation:Required
Bucket string `json:"bucket"`
// +kubebuilder:validation:Required
Prefix string `json:"prefix"`
// +kubebuilder:validation:Optional
Region string `json:"region,omitempty"`
}
// SourceSpec defines the data source for the index.
type SourceSpec struct {
// +kubebuilder:validation:Required
S3 S3SourceSpec `json:"s3"`
}
// OutputSpec defines where the final index artifacts should be stored.
type OutputSpec struct {
// +kubebuilder:validation:Required
IndexName string `json:"indexName"`
// +kubebuilder:validation:Required
Target S3SourceSpec `json:"target"`
}
// MapReduceSpec defines the resource allocation for map and reduce jobs.
type MapReduceSpec struct {
// Number of parallel map jobs to run.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=5
MapJobs int32 `json:"mapJobs"`
// Number of parallel reduce jobs to run.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=2
ReduceJobs int32 `json:"reduceJobs"`
}
// LogIndexSpec defines the desired state of LogIndex
type LogIndexSpec struct {
// +kubebuilder:validation:Required
Source SourceSpec `json:"source"`
// +kubebuilder:validation:Required
Output OutputSpec `json:"output"`
// +kubebuilder:validation:Required
JobSpec MapReduceSpec `json:"jobSpec"`
}
// Phase indicates the current lifecycle stage of the LogIndex process.
type Phase string
const (
PhasePending Phase = "Pending"
PhasePreparing Phase = "Preparing"
PhaseMapping Phase = "Mapping"
PhaseReducing Phase = "Reducing"
PhaseComplete Phase = "Complete"
PhaseFailed Phase = "Failed"
)
// LogIndexStatus defines the observed state of LogIndex
type LogIndexStatus struct {
Phase Phase `json:"phase,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
StartTime *metav1.Time `json:"startTime,omitempty"`
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
// A reference to the ConfigMap holding the file list for mappers.
FileListConfigMap string `json:"fileListConfigMap,omitempty"`
// Metrics about the indexing process.
FilesIndexed int64 `json:"filesIndexed,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="The current status of the LogIndex"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// LogIndex is the Schema for the logindices API
type LogIndex struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec LogIndexSpec `json:"spec,omitempty"`
Status LogIndexStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// LogIndexList contains a list of LogIndex
type LogIndexList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []LogIndex `json:"items"`
}
func init() {
SchemeBuilder.Register(&LogIndex{}, &LogIndexList{})
}
A critical design decision here is the separation of spec
and status
. The spec
is the user’s desired state and must be idempotent. The status
is the controller’s view of the world, which it continuously tries to reconcile with the spec
. A common mistake is to have the controller modify the spec
, which breaks the declarative model. The +kubebuilder:subresource:status
annotation ensures that status updates are a separate API endpoint, preventing accidental spec modifications.
The heart of the operator is the Reconcile
method. This control loop is triggered by any change to a LogIndex
resource or any of its owned resources (like the Jobs
it creates).
// internal/controller/logindex_controller.go
func (r *LogIndexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var logIndex v1alpha1.LogIndex
if err := r.Get(ctx, req.NamespacedName, &logIndex); err != nil {
if errors.IsNotFound(err) {
// Object was deleted, nothing to do.
return ctrl.Result{}, nil
}
log.Error(err, "unable to fetch LogIndex")
return ctrl.Result{}, err
}
// Main reconciliation logic is a state machine based on the status phase.
switch logIndex.Status.Phase {
case "":
return r.reconcilePending(ctx, &logIndex)
case v1alpha1.PhasePending:
return r.reconcilePending(ctx, &logIndex)
case v1alpha1.PhasePreparing:
return r.reconcilePreparing(ctx, &logIndex)
case v1alpha1.PhaseMapping:
return r.reconcileMapping(ctx, &logIndex)
case v1alpha1.PhaseReducing:
return r.reconcileReducing(ctx, &logIndex)
case v1alpha1.PhaseComplete, v1alpha1.PhaseFailed:
// Terminal states, nothing to do.
return ctrl.Result{}, nil
default:
log.Info("Unknown phase, ignoring", "phase", logIndex.Status.Phase)
return ctrl.Result{}, nil
}
}
The first real state is Pending
. Here, the controller needs to initialize the process.
// internal/controller/phases.go
func (r *LogIndexReconciler) reconcilePending(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling Pending phase")
// Update status to mark the beginning of the process.
logIndex.Status.Phase = v1alpha1.PhasePreparing
logIndex.Status.StartTime = &metav1.Time{Time: time.Now()}
// We use conditions for detailed status reporting
meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
Type: "Progressing",
Status: metav1.ConditionTrue,
Reason: "PreparationStarted",
Message: "Starting to prepare for MapReduce jobs",
})
if err := r.Status().Update(ctx, logIndex); err != nil {
log.Error(err, "failed to update LogIndex status")
return ctrl.Result{}, err
}
// Requeue immediately to enter the next state.
return ctrl.Result{Requeue: true}, nil
}
The Preparing
phase involves talking to an external service (S3) to discover the files to be processed. This is a potential point of failure. The pitfall here is doing this work synchronously within the reconcile loop every time. If listing millions of files takes a long time, it can block the operator. Our approach is to perform this discovery once, store the results in a ConfigMap
, and reference it in the status
.
// internal/controller/phases.go
func (r *LogIndexReconciler) reconcilePreparing(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling Preparing phase")
// 1. Create S3 client (configuration via env vars or secrets)
// s3Client := s3.New(...)
// 2. List objects from S3. This code is simplified.
// In a real-world project, this needs robust pagination and error handling.
// objects, err := listAllS3Objects(ctx, s3Client, logIndex.Spec.Source.S3.Bucket, logIndex.Spec.Source.S3.Prefix)
// if err != nil {
// log.Error(err, "failed to list S3 objects")
// // Set failed status and stop.
// return r.updateStatusToFailed(ctx, logIndex, "S3ListFailed", err.Error())
// }
// For demonstration, let's assume we got a list of files.
objects := []string{"log-part-001.gz", "log-part-002.gz", "log-part-003.gz", "log-part-004.gz", "log-part-005.gz"}
// 3. Split the file list for the mappers
filePartitions := splitFiles(objects, int(logIndex.Spec.JobSpec.MapJobs))
// 4. Create a ConfigMap to hold these partitions.
// Using a ConfigMap avoids stuffing a potentially huge file list into the LogIndex status,
// which is backed by etcd and has size limits.
cm := r.buildPartitionsConfigMap(logIndex, filePartitions)
if err := ctrl.SetControllerReference(logIndex, cm, r.Scheme); err != nil {
return ctrl.Result{}, err
}
err := r.Create(ctx, cm)
if err != nil && !errors.IsAlreadyExists(err) {
log.Error(err, "failed to create partitions ConfigMap")
return ctrl.Result{}, err
}
// 5. Update status
logIndex.Status.Phase = v1alpha1.PhaseMapping
logIndex.Status.FileListConfigMap = cm.Name
logIndex.Status.FilesIndexed = int64(len(objects))
meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
Type: "Progressing",
Status: metav1.ConditionTrue,
Reason: "MapJobsCreation",
Message: "File list prepared, creating Map jobs",
})
if err := r.Status().Update(ctx, logIndex); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
The ctrl.SetControllerReference
call is non-negotiable. It establishes an ownership link. When the LogIndex
custom resource is deleted, Kubernetes garbage collection will automatically delete the ConfigMap
and any Jobs
we create. Without it, we’d leak resources across the cluster.
Now we enter the Mapping
phase. The controller’s job is to create the Map
jobs and then monitor their progress.
// internal/controller/phases.go
func (r *LogIndexReconciler) reconcileMapping(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling Mapping phase")
// Create map jobs if they don't exist
for i := 0; i < int(logIndex.Spec.JobSpec.MapJobs); i++ {
jobName := fmt.Sprintf("%s-map-%d", logIndex.Name, i)
var job batchv1.Job
err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: logIndex.Namespace}, &job)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating Map job", "jobName", jobName)
mapJob := r.buildMapJob(logIndex, i)
if err := ctrl.SetControllerReference(logIndex, mapJob, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, mapJob); err != nil {
log.Error(err, "Failed to create Map job", "jobName", jobName)
return ctrl.Result{}, err
}
} else if err != nil {
return ctrl.Result{}, err
}
}
// Check status of map jobs
var childJobs batchv1.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(logIndex.Namespace), client.MatchingLabels{"logindex.my.domain/job-type": "map"}); err != nil {
return ctrl.Result{}, err
}
succeeded := 0
failed := 0
for _, job := range childJobs.Items {
isOwner := false
for _, owner := range job.GetOwnerReferences() {
if owner.UID == logIndex.UID {
isOwner = true
break
}
}
if !isOwner {
continue // Don't count jobs from other LogIndex instances
}
if job.Status.Succeeded > 0 {
succeeded++
} else if job.Status.Failed > 0 {
// A single map job failure fails the entire operation.
// A more resilient strategy could involve retries, but for simplicity we fail fast.
return r.updateStatusToFailed(ctx, logIndex, "MapJobFailed", fmt.Sprintf("Job %s failed", job.Name))
}
}
if succeeded == int(logIndex.Spec.JobSpec.MapJobs) {
log.Info("All map jobs completed successfully")
logIndex.Status.Phase = v1alpha1.PhaseReducing
meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
Type: "Progressing",
Status: metav1.ConditionTrue,
Reason: "ReduceJobsCreation",
Message: "Map phase complete, creating Reduce jobs",
})
if err := r.Status().Update(ctx, logIndex); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// Not all jobs finished, requeue after a short delay to check again.
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}
// buildMapJob creates the Job object for a mapper.
func (r *LogIndexReconciler) buildMapJob(logIndex *v1alpha1.LogIndex, partitionIndex int) *batchv1.Job {
// ... Kubernetes Job spec construction ...
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-map-%d", logIndex.Name, partitionIndex),
Namespace: logIndex.Namespace,
Labels: map[string]string{
"logindex.my.domain/job-type": "map",
"logindex.my.domain/name": logIndex.Name,
},
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "mapper",
Image: "my-registry/log-indexer:v1.0.0", // Our application image
Args: []string{"map"},
Env: []corev1.EnvVar{
{Name: "PARTITION_INDEX", Value: strconv.Itoa(partitionIndex)},
{Name: "CONFIG_MAP_NAME", Value: fmt.Sprintf("%s-partitions", logIndex.Name)},
},
VolumeMounts: []corev1.VolumeMount{
// Mount for intermediate data
},
}},
RestartPolicy: corev1.RestartPolicyNever,
},
},
BackoffLimit: ptr.To[int32](2), // Retry failed pods twice before failing the job
},
}
// ... Volume definitions to mount the ConfigMap and shared storage for intermediate results ...
return job
}
The mapper container itself is a Go application. Its main
function checks the map
argument and runs the mapping logic.
// cmd/indexer/main.go (runs inside the container)
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func main() {
if len(os.Args) < 2 {
log.Fatal("expected 'map' or 'reduce' subcommand")
}
switch os.Args[1] {
case "map":
runMapper()
case "reduce":
runReducer()
default:
log.Fatal("unknown subcommand")
}
}
func runMapper() {
partitionIndex := os.Getenv("PARTITION_INDEX")
configMapName := os.Getenv("CONFIG_MAP_NAME")
// 1. In-cluster Kubernetes client setup to read the ConfigMap
// config, err := rest.InClusterConfig()
// clientset, err := kubernetes.NewForConfig(config)
// 2. Read the assigned file list from the ConfigMap's data field,
// using the partitionIndex as the key.
// cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), configMapName, metav1.GetOptions{})
// fileListJson := cm.Data[fmt.Sprintf("partition-%s", partitionIndex)]
// var fileList []string
// json.Unmarshal([]byte(fileListJson), &fileList)
log.Printf("Mapper for partition %s starting process for %d files.", partitionIndex, 10 /* len(fileList) */)
// 3. For each file in the list:
// a. Download from S3.
// b. Decompress if needed.
// c. Parse lines, tokenize words, and generate (term, docID) pairs.
// d. Write intermediate key-value pairs to a shared volume, e.g., /mnt/shuffle/map-output-<partitionIndex>.tmp
log.Printf("Mapper for partition %s finished.", partitionIndex)
}
The logic for reconcileReducing
is structurally identical to reconcileMapping
: create reducer Jobs
, monitor their status, and if they all succeed, transition to the final Complete
state. The reducer pod’s application logic reads all the intermediate files from the shared volume, groups them by term, merges the document postings, and writes the final index shards to their destination.
Upon successful completion of the reduce phase, the controller performs its final act.
// internal/controller/phases.go
func (r *LogIndexReconciler) reconcileComplete(ctx context.Context, logIndex *v1alpha1.LogIndex) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Reconciling Complete phase")
// The process is finished, but we need to do some housekeeping.
// Clean up the ConfigMap that held the file partitions.
cmName := logIndex.Status.FileListConfigMap
if cmName != "" {
var cm corev1.ConfigMap
err := r.Get(ctx, types.NamespacedName{Name: cmName, Namespace: logIndex.Namespace}, &cm)
if err == nil {
if err := r.Delete(ctx, &cm); err != nil {
log.Error(err, "Failed to delete partitions ConfigMap")
// Don't block completion for cleanup failure, just log it.
}
}
}
logIndex.Status.Phase = v1alpha1.PhaseComplete
now := metav1.Now()
logIndex.Status.CompletionTime = &now
meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
Type: "Progressing",
Status: metav1.ConditionFalse,
Reason: "IndexingComplete",
Message: "MapReduce pipeline finished successfully",
})
meta.SetStatusCondition(&logIndex.Status.Conditions, metav1.Condition{
Type: "Available",
Status: metav1.ConditionTrue,
Reason: "IndexingComplete",
Message: "Index is ready for querying",
})
if err := r.Status().Update(ctx, logIndex); err != nil {
return ctrl.Result{}, err
}
log.Info("LogIndex successfully reconciled")
return ctrl.Result{}, nil // Terminal state, no requeue.
}
By building this operator, we transformed our indexing process from a manual, error-prone task into a fully automated, declarative, and observable system native to our Kubernetes platform. An engineer now simply defines a LogIndex
resource in YAML and commits it to Git. ArgoCD picks it up, applies it to the cluster, and our operator handles the rest.
This implementation, however, is not without its limitations. The use of a shared filesystem volume (like NFS) for the shuffle phase between Map and Reduce can become a performance bottleneck and is a single point of failure. A more robust architecture might leverage a distributed key-value store or an in-memory data grid for this intermediate data exchange. Furthermore, the operator currently triggers a full re-index of the source data every time. A future enhancement would be to implement incremental indexing, where the operator checksums the source files and only processes new or modified data since the last successful run, dramatically reducing processing time and cost for subsequent updates. Finally, observability could be improved by having the operator scrape metrics from the job pods (e.g., records processed, bytes read) and expose them as Prometheus metrics for finer-grained monitoring of the pipeline’s health and performance.