Implementing a gRPC-Controlled Kubernetes Operator with Couchbase State Management on AWS EKS


The operational challenge with distributed caching in a microservices environment isn’t populating the cache, but invalidating it correctly and efficiently. A common pattern involves a message broker fan-out, but this introduces complexities around delivery guarantees, message ordering, and consumer acknowledgment, often leading to stale data. Attempting to solve this with direct service-to-service HTTP calls creates a tightly coupled mesh that is brittle and hard to maintain. The real-world pain point is the lack of a central, reliable, and observable orchestrator for these high-frequency, ephemeral invalidation events.

Our initial concept was to build a Kubernetes-native solution. A Kubernetes Operator seemed like the right primitive. The standard approach is to define a Custom Resource Definition (CRD), say CacheInvalidationJob, and have the operator’s reconciliation loop act upon it. However, for high-throughput, programmatic invalidation triggers from our internal services, interacting with the Kubernetes API server to create, update, and delete thousands of CRs per minute is inefficient and adds significant load to etcd. It’s using a declarative system for an imperative task.

This led to a hybrid architectural decision. The operator would be driven by two inputs:

  1. Declarative: A CacheInvalidationJob CR for manual, kubectl-driven operations and GitOps workflows.
  2. Imperative: A high-performance gRPC endpoint exposed directly by the operator pod for service-to-service invalidation requests.

Furthermore, tracking the state of each invalidation job—which target pods have acknowledged, which failed, retry counts, completion timestamps—is too complex and voluminous for the status subresource of a CRD. In a real-world project, an operator pod can be rescheduled, and storing this transient state in memory is not an option. We required a durable, queryable, and distributed state store external to etcd. Couchbase was selected for this role due to its native JSON document support, powerful N1QL query language, and high-performance key-value operations, making it an ideal backend for storing and managing job states.

The entire system would run on AWS EKS for managed Kubernetes infrastructure, with the operator’s build and deployment pipeline automated through GitHub Actions.

Defining the Core Primitives: CRD and gRPC Proto

First, we define the declarative API via a CRD. This Go struct, using kubebuilder markers, will be translated into the CacheInvalidationJob CRD manifest. It’s designed to be simple, focusing on the what, not the how.

api/v1alpha1/cacheinvalidationjob_types.go:

package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// CacheInvalidationJobSpec defines the desired state of CacheInvalidationJob
type CacheInvalidationJobSpec struct {
	// TargetSelector is a label selector to identify the pods whose caches need invalidation.
	// +kubebuilder:validation:Required
	TargetSelector metav1.LabelSelector `json:"targetSelector"`

	// CacheKeys is a list of keys or patterns to be invalidated.
	// +kubebuilder:validation:Required
	// +kubebuilder:validation:MinItems=1
	CacheKeys []string `json:"cacheKeys"`

	// InvalidationID is a unique identifier for this invalidation event,
	// allowing external systems to track its progress.
	// +kubebuilder:validation:Required
	InvalidationID string `json:"invalidationId"`
}

// CacheInvalidationJobStatus defines the observed state of CacheInvalidationJob
type CacheInvalidationJobStatus struct {
	// Phase indicates the current state of the job. E.g., Pending, Processing, Succeeded, Failed.
	Phase string `json:"phase,omitempty"`

	// Conditions represent the latest available observations of an object's state.
	Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// CacheInvalidationJob is the Schema for the cacheinvalidationjobs API
type CacheInvalidationJob struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   CacheInvalidationJobSpec   `json:"spec,omitempty"`
	Status CacheInvalidationJobStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// CacheInvalidationJobList contains a list of CacheInvalidationJob
type CacheInvalidationJobList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []CacheInvalidationJob `json:"items"`
}

func init() {
	SchemeBuilder.Register(&CacheInvalidationJob{}, &CacheInvalidationJobList{})
}

Next, the imperative API is defined using Protocol Buffers. This will be our high-throughput entry point.

proto/invalidator/v1/invalidator.proto:

syntax = "proto3";

package invalidator.v1;

option go_package = "github.com/your-org/invalidator-operator/gen/invalidator/v1;invalidatorv1";

import "google/protobuf/struct.proto";

service InvalidatorService {
  // SubmitInvalidation triggers a new cache invalidation job asynchronously.
  rpc SubmitInvalidation(SubmitInvalidationRequest) returns (SubmitInvalidationResponse);
}

message SubmitInvalidationRequest {
  // A unique identifier provided by the client for idempotency and tracking.
  string request_id = 1;

  // A map representing the label selector for target pods.
  // e.g., {"app": "user-service", "tier": "cache"}
  map<string, string> target_selector = 2;

  // The list of cache keys or patterns to invalidate.
  repeated string cache_keys = 3;
}

message SubmitInvalidationResponse {
  // The internal Job ID assigned by the operator for tracking in Couchbase.
  string job_id = 1;

  // Status indicates if the job was accepted for processing.
  string status = 2; // e.g., "ACCEPTED"
}

The Operator’s Core: Reconciler and State Model

The heart of the operator is the Reconcile method. A common mistake is to pack too much business logic directly into it. Our approach separates concerns: the reconciler acts as a trigger, delegating the heavy lifting of state management and execution to a dedicated service that interacts with Couchbase.

The state of each job, whether created via CRD or gRPC, is stored in a Couchbase document. This provides durability and a single source of truth.

internal/state/models.go:

package state

import "time"

// JobStatus represents the lifecycle state of an invalidation job.
type JobStatus string

const (
	StatusPending    JobStatus = "PENDING"
	StatusProcessing JobStatus = "PROCESSING"
	StatusSucceeded  JobStatus = "SUCCEEDED"
	StatusFailed     JobStatus = "FAILED"
	StatusRetrying   JobStatus = "RETRYING"
)

// InvalidationJobDocument is the structure stored in Couchbase.
type InvalidationJobDocument struct {
	ID             string            `json:"id"`             // Unique job ID (e.g., UUID)
	Type           string            `json:"type"`           // Constant "invalidation-job" for N1QL queries
	Source         string            `json:"source"`         // "CRD" or "GRPC"
	ExternalID     string            `json:"externalId"`     // InvalidationID from CRD or request_id from gRPC
	TargetSelector map[string]string `json:"targetSelector"` // Stored as a map for easy querying
	CacheKeys      []string          `json:"cacheKeys"`
	Status         JobStatus         `json:"status"`
	Attempts       int               `json:"attempts"`
	CreatedAt      time.Time         `json:"createdAt"`
	LastModified   time.Time         `json:"lastModified"`
	TargetPods     []PodStatus       `json:"targetPods,omitempty"`
	Error          string            `json:"error,omitempty"`
}

// PodStatus tracks the invalidation status for an individual target pod.
type PodStatus struct {
	PodName       string    `json:"podName"`
	PodIP         string    `json:"podIp"`
	Status        string    `json:"status"` // e.g., "SUCCESS", "FAILED"
	LastAttemptAt time.Time `json:"lastAttemptAt"`
	ErrorMessage  string    `json:"errorMessage,omitempty"`
}

Integrating Couchbase for Robust State Management

Connecting to Couchbase from our Go operator requires a robust client that handles retries and connection management. We’ll initialize this client once at operator startup.

internal/couchbase/client.go:

package couchbase

import (
	"log"
	"time"

	"github.com/couchbase/gocb/v2"
)

// Config holds the configuration for the Couchbase client.
type Config struct {
	ConnectionString string
	Username         string
	Password         string
	BucketName       string
}

// Client wraps the gocb.Cluster and provides access to a specific bucket/collection.
type Client struct {
	Cluster    *gocb.Cluster
	Bucket     *gocb.Bucket
	Collection *gocb.Collection
}

// NewClient creates and connects a new Couchbase client.
// In a real production system, you'd use a more robust retry mechanism.
func NewClient(config Config) (*Client, error) {
	log.Println("Connecting to Couchbase cluster...")
	cluster, err := gocb.Connect(config.ConnectionString, gocb.ClusterOptions{
		Authenticator: gocb.PasswordAuthenticator{
			Username: config.Username,
			Password: config.Password,
		},
	})
	if err != nil {
		return nil, err
	}

	bucket := cluster.Bucket(config.BucketName)
	err = bucket.WaitUntilReady(5*time.Second, nil)
	if err != nil {
		return nil, err
	}
	
	// Using the default collection for simplicity. In production, use dedicated scopes/collections.
	collection := bucket.DefaultCollection()

	log.Println("Successfully connected to Couchbase.")
	return &Client{
		Cluster:    cluster,
		Bucket:     bucket,
		Collection: collection,
	}, nil
}

// Close gracefully disconnects the client.
func (c *Client) Close() {
	if c.Cluster != nil {
		c.Cluster.Close(nil)
	}
}

// UpsertJob stores or updates an InvalidationJobDocument.
func (c *Client) UpsertJob(doc state.InvalidationJobDocument) error {
    // A common pitfall is not setting appropriate timeouts, which can cause
    // the operator to hang if the database is unresponsive.
	_, err := c.Collection.Upsert(doc.ID, doc, &gocb.UpsertOptions{
		Timeout: 2 * time.Second,
	})
	return err
}

// FindPendingJobs queries Couchbase for jobs that need processing.
// Using N1QL allows for flexible querying without scanning all documents.
func (c *Client) FindPendingJobs() ([]state.InvalidationJobDocument, error) {
    // Ensure you have a N1QL index on `type` and `status` for performance.
    // CREATE INDEX idx_pending_jobs ON `your-bucket-name`(`status`) WHERE `type` = "invalidation-job"
	query := `SELECT b.* FROM ` + c.Bucket.Name() + ` b WHERE b.type = "invalidation-job" AND b.status IN ["PENDING", "RETRYING"]`
	
    rows, err := c.Cluster.Query(query, &gocb.QueryOptions{
        Timeout: 5 * time.Second,
    })
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var jobs []state.InvalidationJobDocument
	for rows.Next() {
		var job state.InvalidationJobDocument
		if err := rows.Row(&job); err != nil {
			log.Printf("Error scanning row into job document: %v", err)
			continue
		}
		jobs = append(jobs, job)
	}

	return jobs, rows.Err()
}

Revisiting the Reconciler with a Database-Driven Flow

Our reconciler’s logic now shifts. Instead of acting on every CR change, it periodically queries Couchbase for work. This decouples it from the Kubernetes API server’s event stream for job processing and makes the entire system more resilient.

internal/controller/cacheinvalidationjob_controller.go:

// ... imports and Reconciler struct definition ...
type CacheInvalidationJobReconciler struct {
	client.Client
	Scheme         *runtime.Scheme
	Log            logr.Logger
	CouchbaseClient *cb.Client // Our Couchbase client
	JobProcessor   *processing.Processor // The service doing the actual work
}

// Reconcile is triggered by CR changes or periodically.
// Its main job is to ensure the state in Couchbase reflects the CRD,
// and to trigger the processor for pending work.
func (r *CacheInvalidationJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := r.Log.WithValues("cacheinvalidationjob", req.NamespacedName)

	// 1. Handle CRD creation/update: Create/update the corresponding Couchbase document.
	var jobCR invalpha1.CacheInvalidationJob
	if err := r.Get(ctx, req.NamespacedName, &jobCR); err != nil {
		if errors.IsNotFound(err) {
			// CR was deleted. We could add logic here to mark the Couchbase doc as "CANCELLED".
			return ctrl.Result{}, nil
		}
		return ctrl.Result{}, err
	}
	
	// Translate CRD to Couchbase document and upsert it.
    // The key is to make this operation idempotent.
	doc := state.InvalidationJobDocument{
		ID:             string(jobCR.UID), // Use UID for a stable, unique ID.
		Type:           "invalidation-job",
		Source:         "CRD",
		ExternalID:     jobCR.Spec.InvalidationID,
		TargetSelector: jobCR.Spec.TargetSelector.MatchLabels,
		CacheKeys:      jobCR.Spec.CacheKeys,
		Status:         state.StatusPending, // Always start as pending
		CreatedAt:      jobCR.CreationTimestamp.Time,
		LastModified:   time.Now().UTC(),
	}
    
    // A critical design choice: the reconciler only writes the initial state.
    // The processor will own state transitions from PENDING onwards.
	if err := r.CouchbaseClient.UpsertJob(doc); err != nil {
		log.Error(err, "failed to sync CRD to Couchbase")
		return ctrl.Result{RequeueAfter: 30 * time.Second}, err
	}

	// 2. Trigger the processor to look for work.
	// The processor itself is what queries Couchbase for PENDING jobs.
	// This makes our logic testable outside the reconciliation loop.
	go r.JobProcessor.ProcessPendingJobs()

	// We requeue periodically to ensure we don't miss jobs if the processor goroutine dies.
	return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *CacheInvalidationJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&invalpha1.CacheInvalidationJob{}).
		Complete(r)
}

The diagram below illustrates the flow for both CRD and gRPC triggers.

sequenceDiagram
    participant Client as gRPC Client
    participant CRD as User (kubectl)
    participant Operator as Operator Pod
    participant Reconciler as Reconciler Loop
    participant gRPCServer as gRPC Server
    participant Processor as Job Processor
    participant Couchbase as Couchbase Cluster
    participant K8s API as Kubernetes API Server

    CRD->>K8s API: Apply CacheInvalidationJob CR
    K8s API->>Reconciler: Event: CR created
    Reconciler->>Couchbase: Upsert Job Document (status: PENDING)
    Reconciler->>Processor: Trigger job processing (async)

    Client->>gRPCServer: SubmitInvalidation() call
    gRPCServer->>Couchbase: Insert Job Document (status: PENDING)
    gRPCServer->>Client: Return job_id
    
    Note over Processor: Periodically, or when triggered...
    Processor->>Couchbase: N1QL Query for PENDING jobs
    Couchbase-->>Processor: Returns list of pending jobs
    Processor->>Processor: For each job, discover target pods
    Processor->>K8s API: Get Pods with label selector
    K8s API-->>Processor: List of Pod IPs
    Processor->>Couchbase: Update Job Document (status: PROCESSING, add pod list)
    
    loop For each Pod IP
        Processor->>TargetPod: Send invalidation request (e.g., HTTP POST)
    end
    
    Processor->>Couchbase: Update pod statuses in Job Document
    Note over Processor: Based on outcomes, update final job status to SUCCEEDED/FAILED.

Implementing the gRPC Server

The gRPC server runs in a separate goroutine within the main operator process. Its sole responsibility is to validate incoming requests and persist them to Couchbase as a PENDING job.

internal/grpc/server.go:

package grpc

import (
	"context"
	"log"

	"github.com/google/uuid"
	"github.com/your-org/invalidator-operator/gen/invalidator/v1"
	"github.com/your-org/invalidator-operator/internal/couchbase"
	"github.com/your-org/invalidator-operator/internal/state"
	"google.golang.org/grpc"
	"net"
	"time"
)

type Server struct {
	invalidatorv1.UnimplementedValidatorServiceServer
	cbClient *couchbase.Client
}

func NewServer(cb *couchbase.Client) *Server {
	return &Server{cbClient: cb}
}

// Start runs the gRPC server in a goroutine.
func (s *Server) Start(listenAddr string) {
	lis, err := net.Listen("tcp", listenAddr)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	grpcServer := grpc.NewServer()
	invalidatorv1.RegisterInvalidatorServiceServer(grpcServer, s)

	log.Printf("gRPC server listening at %v", lis.Addr())
	go func() {
		if err := grpcServer.Serve(lis); err != nil {
			log.Fatalf("failed to serve gRPC: %v", err)
		}
	}()
}

func (s *Server) SubmitInvalidation(ctx context.Context, req *invalidatorv1.SubmitInvalidationRequest) (*invalidatorv1.SubmitInvalidationResponse, error) {
	// Basic validation. A production system would have more robust checks.
	if req.RequestId == "" || len(req.TargetSelector) == 0 || len(req.CacheKeys) == 0 {
		return nil, status.Error(codes.InvalidArgument, "request_id, target_selector, and cache_keys are required")
	}

	jobID := uuid.New().String()
	doc := state.InvalidationJobDocument{
		ID:             jobID,
		Type:           "invalidation-job",
		Source:         "GRPC",
		ExternalID:     req.RequestId,
		TargetSelector: req.TargetSelector,
		CacheKeys:      req.CacheKeys,
		Status:         state.StatusPending,
		CreatedAt:      time.Now().UTC(),
		LastModified:   time.Now().UTC(),
	}

	if err := s.cbClient.UpsertJob(doc); err != nil {
		log.Printf("ERROR: failed to persist gRPC job to Couchbase: %v", err)
		return nil, status.Error(codes.Internal, "failed to schedule invalidation job")
	}

	return &invalidatorv1.SubmitInvalidationResponse{
		JobId:  jobID,
		Status: "ACCEPTED",
	}, nil
}

The CI/CD Pipeline with GitHub Actions

Automating the build and deployment is crucial for operational maturity. Our GitHub Actions workflow will handle building the Go binary, creating a container image, pushing it to Amazon ECR, and deploying the operator manifests to EKS.

.github/workflows/deploy.yml:

name: Deploy Invalidator Operator

on:
  push:
    branches:
      - main
    paths:
      - '**.go'
      - 'Dockerfile'
      - 'go.mod'
      - 'go.sum'

env:
  AWS_REGION: us-east-1
  ECR_REPOSITORY: invalidator-operator
  EKS_CLUSTER_NAME: my-production-cluster

jobs:
  build-and-push:
    name: Build and Push Docker Image
    runs-on: ubuntu-latest
    permissions:
      contents: read
      id-token: write # Required for OIDC authentication with AWS

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/GitHubActionsECRAccessRole
          aws-region: ${{ env.AWS_REGION }}

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Set up Go
        uses: actions/setup-go@v4
        with:
          go-version: '1.21'

      - name: Build the operator binary
        run: make docker-buildx

      - name: Build and push Docker image
        id: docker_build
        uses: docker/build-push-action@v4
        with:
          context: .
          file: Dockerfile
          builder: ${{ steps.build-push.outputs.builder }}
          push: true
          tags: ${{ steps.login-ecr.outputs.registry }}/${{ env.ECR_REPOSITORY }}:${{ github.sha }}
          cache-from: type=gha
          cache-to: type=gha,mode=max
          
  deploy-to-eks:
    name: Deploy to EKS
    runs-on: ubuntu-latest
    needs: build-and-push
    permissions:
      contents: read
      id-token: write # Required for OIDC authentication with AWS

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/GitHubActionsEKSAccessRole
          aws-region: ${{ env.AWS_REGION }}

      - name: Set up Kubeconfig
        run: aws eks update-kubeconfig --name ${{ env.EKS_CLUSTER_NAME }} --region ${{ env.AWS_REGION }}

      - name: Install kustomize
        uses: imranismail/setup-kustomize@v2

      - name: Update image tag in kustomization
        run: |
          cd config/manager
          kustomize edit set image controller=${{ needs.build-and-push.outputs.registry }}/${{ env.ECR_REPOSITORY }}:${{ github.sha }}

      - name: Deploy to EKS
        run: kustomize build config/default | kubectl apply -f -

A common pitfall in these pipelines is managing credentials. The workflow above avoids long-lived IAM user keys by using OIDC to assume an IAM role, which is the recommended best practice for security.

The system is now a cohesive unit: a Kubernetes operator providing both declarative and imperative APIs for a critical middleware function, backed by a robust distributed database for state, running on managed Kubernetes, and deployed via a secure, automated pipeline.

The architecture’s primary limitation is its reliance on polling Couchbase for pending jobs. While effective and simple, this introduces a small latency floor equal to the polling interval. A more advanced iteration could leverage Couchbase’s Eventing service or a Change Data Capture (CDC) stream to trigger the job processor reactively, moving towards a truly event-driven model. Additionally, the communication from the operator to target pods is currently a simple fire-and-forget HTTP call; a production-grade system might implement a more sophisticated mechanism, possibly involving a sidecar on each target pod that establishes a persistent gRPC stream with the operator for near-instantaneous invalidation commands. The current security posture of the gRPC endpoint is also minimal; mTLS authentication between client services and the operator would be a mandatory addition before deploying to a production environment.


  TOC