The operational challenge with distributed caching in a microservices environment isn’t populating the cache, but invalidating it correctly and efficiently. A common pattern involves a message broker fan-out, but this introduces complexities around delivery guarantees, message ordering, and consumer acknowledgment, often leading to stale data. Attempting to solve this with direct service-to-service HTTP calls creates a tightly coupled mesh that is brittle and hard to maintain. The real-world pain point is the lack of a central, reliable, and observable orchestrator for these high-frequency, ephemeral invalidation events.
Our initial concept was to build a Kubernetes-native solution. A Kubernetes Operator seemed like the right primitive. The standard approach is to define a Custom Resource Definition (CRD), say CacheInvalidationJob
, and have the operator’s reconciliation loop act upon it. However, for high-throughput, programmatic invalidation triggers from our internal services, interacting with the Kubernetes API server to create, update, and delete thousands of CRs per minute is inefficient and adds significant load to etcd
. It’s using a declarative system for an imperative task.
This led to a hybrid architectural decision. The operator would be driven by two inputs:
- Declarative: A
CacheInvalidationJob
CR for manual,kubectl
-driven operations and GitOps workflows. - Imperative: A high-performance gRPC endpoint exposed directly by the operator pod for service-to-service invalidation requests.
Furthermore, tracking the state of each invalidation job—which target pods have acknowledged, which failed, retry counts, completion timestamps—is too complex and voluminous for the status
subresource of a CRD. In a real-world project, an operator pod can be rescheduled, and storing this transient state in memory is not an option. We required a durable, queryable, and distributed state store external to etcd
. Couchbase was selected for this role due to its native JSON document support, powerful N1QL query language, and high-performance key-value operations, making it an ideal backend for storing and managing job states.
The entire system would run on AWS EKS for managed Kubernetes infrastructure, with the operator’s build and deployment pipeline automated through GitHub Actions.
Defining the Core Primitives: CRD and gRPC Proto
First, we define the declarative API via a CRD. This Go struct, using kubebuilder
markers, will be translated into the CacheInvalidationJob
CRD manifest. It’s designed to be simple, focusing on the what, not the how.
api/v1alpha1/cacheinvalidationjob_types.go
:
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// CacheInvalidationJobSpec defines the desired state of CacheInvalidationJob
type CacheInvalidationJobSpec struct {
// TargetSelector is a label selector to identify the pods whose caches need invalidation.
// +kubebuilder:validation:Required
TargetSelector metav1.LabelSelector `json:"targetSelector"`
// CacheKeys is a list of keys or patterns to be invalidated.
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinItems=1
CacheKeys []string `json:"cacheKeys"`
// InvalidationID is a unique identifier for this invalidation event,
// allowing external systems to track its progress.
// +kubebuilder:validation:Required
InvalidationID string `json:"invalidationId"`
}
// CacheInvalidationJobStatus defines the observed state of CacheInvalidationJob
type CacheInvalidationJobStatus struct {
// Phase indicates the current state of the job. E.g., Pending, Processing, Succeeded, Failed.
Phase string `json:"phase,omitempty"`
// Conditions represent the latest available observations of an object's state.
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// CacheInvalidationJob is the Schema for the cacheinvalidationjobs API
type CacheInvalidationJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CacheInvalidationJobSpec `json:"spec,omitempty"`
Status CacheInvalidationJobStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// CacheInvalidationJobList contains a list of CacheInvalidationJob
type CacheInvalidationJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []CacheInvalidationJob `json:"items"`
}
func init() {
SchemeBuilder.Register(&CacheInvalidationJob{}, &CacheInvalidationJobList{})
}
Next, the imperative API is defined using Protocol Buffers. This will be our high-throughput entry point.
proto/invalidator/v1/invalidator.proto
:
syntax = "proto3";
package invalidator.v1;
option go_package = "github.com/your-org/invalidator-operator/gen/invalidator/v1;invalidatorv1";
import "google/protobuf/struct.proto";
service InvalidatorService {
// SubmitInvalidation triggers a new cache invalidation job asynchronously.
rpc SubmitInvalidation(SubmitInvalidationRequest) returns (SubmitInvalidationResponse);
}
message SubmitInvalidationRequest {
// A unique identifier provided by the client for idempotency and tracking.
string request_id = 1;
// A map representing the label selector for target pods.
// e.g., {"app": "user-service", "tier": "cache"}
map<string, string> target_selector = 2;
// The list of cache keys or patterns to invalidate.
repeated string cache_keys = 3;
}
message SubmitInvalidationResponse {
// The internal Job ID assigned by the operator for tracking in Couchbase.
string job_id = 1;
// Status indicates if the job was accepted for processing.
string status = 2; // e.g., "ACCEPTED"
}
The Operator’s Core: Reconciler and State Model
The heart of the operator is the Reconcile
method. A common mistake is to pack too much business logic directly into it. Our approach separates concerns: the reconciler acts as a trigger, delegating the heavy lifting of state management and execution to a dedicated service that interacts with Couchbase.
The state of each job, whether created via CRD or gRPC, is stored in a Couchbase document. This provides durability and a single source of truth.
internal/state/models.go
:
package state
import "time"
// JobStatus represents the lifecycle state of an invalidation job.
type JobStatus string
const (
StatusPending JobStatus = "PENDING"
StatusProcessing JobStatus = "PROCESSING"
StatusSucceeded JobStatus = "SUCCEEDED"
StatusFailed JobStatus = "FAILED"
StatusRetrying JobStatus = "RETRYING"
)
// InvalidationJobDocument is the structure stored in Couchbase.
type InvalidationJobDocument struct {
ID string `json:"id"` // Unique job ID (e.g., UUID)
Type string `json:"type"` // Constant "invalidation-job" for N1QL queries
Source string `json:"source"` // "CRD" or "GRPC"
ExternalID string `json:"externalId"` // InvalidationID from CRD or request_id from gRPC
TargetSelector map[string]string `json:"targetSelector"` // Stored as a map for easy querying
CacheKeys []string `json:"cacheKeys"`
Status JobStatus `json:"status"`
Attempts int `json:"attempts"`
CreatedAt time.Time `json:"createdAt"`
LastModified time.Time `json:"lastModified"`
TargetPods []PodStatus `json:"targetPods,omitempty"`
Error string `json:"error,omitempty"`
}
// PodStatus tracks the invalidation status for an individual target pod.
type PodStatus struct {
PodName string `json:"podName"`
PodIP string `json:"podIp"`
Status string `json:"status"` // e.g., "SUCCESS", "FAILED"
LastAttemptAt time.Time `json:"lastAttemptAt"`
ErrorMessage string `json:"errorMessage,omitempty"`
}
Integrating Couchbase for Robust State Management
Connecting to Couchbase from our Go operator requires a robust client that handles retries and connection management. We’ll initialize this client once at operator startup.
internal/couchbase/client.go
:
package couchbase
import (
"log"
"time"
"github.com/couchbase/gocb/v2"
)
// Config holds the configuration for the Couchbase client.
type Config struct {
ConnectionString string
Username string
Password string
BucketName string
}
// Client wraps the gocb.Cluster and provides access to a specific bucket/collection.
type Client struct {
Cluster *gocb.Cluster
Bucket *gocb.Bucket
Collection *gocb.Collection
}
// NewClient creates and connects a new Couchbase client.
// In a real production system, you'd use a more robust retry mechanism.
func NewClient(config Config) (*Client, error) {
log.Println("Connecting to Couchbase cluster...")
cluster, err := gocb.Connect(config.ConnectionString, gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: config.Username,
Password: config.Password,
},
})
if err != nil {
return nil, err
}
bucket := cluster.Bucket(config.BucketName)
err = bucket.WaitUntilReady(5*time.Second, nil)
if err != nil {
return nil, err
}
// Using the default collection for simplicity. In production, use dedicated scopes/collections.
collection := bucket.DefaultCollection()
log.Println("Successfully connected to Couchbase.")
return &Client{
Cluster: cluster,
Bucket: bucket,
Collection: collection,
}, nil
}
// Close gracefully disconnects the client.
func (c *Client) Close() {
if c.Cluster != nil {
c.Cluster.Close(nil)
}
}
// UpsertJob stores or updates an InvalidationJobDocument.
func (c *Client) UpsertJob(doc state.InvalidationJobDocument) error {
// A common pitfall is not setting appropriate timeouts, which can cause
// the operator to hang if the database is unresponsive.
_, err := c.Collection.Upsert(doc.ID, doc, &gocb.UpsertOptions{
Timeout: 2 * time.Second,
})
return err
}
// FindPendingJobs queries Couchbase for jobs that need processing.
// Using N1QL allows for flexible querying without scanning all documents.
func (c *Client) FindPendingJobs() ([]state.InvalidationJobDocument, error) {
// Ensure you have a N1QL index on `type` and `status` for performance.
// CREATE INDEX idx_pending_jobs ON `your-bucket-name`(`status`) WHERE `type` = "invalidation-job"
query := `SELECT b.* FROM ` + c.Bucket.Name() + ` b WHERE b.type = "invalidation-job" AND b.status IN ["PENDING", "RETRYING"]`
rows, err := c.Cluster.Query(query, &gocb.QueryOptions{
Timeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []state.InvalidationJobDocument
for rows.Next() {
var job state.InvalidationJobDocument
if err := rows.Row(&job); err != nil {
log.Printf("Error scanning row into job document: %v", err)
continue
}
jobs = append(jobs, job)
}
return jobs, rows.Err()
}
Revisiting the Reconciler with a Database-Driven Flow
Our reconciler’s logic now shifts. Instead of acting on every CR change, it periodically queries Couchbase for work. This decouples it from the Kubernetes API server’s event stream for job processing and makes the entire system more resilient.
internal/controller/cacheinvalidationjob_controller.go
:
// ... imports and Reconciler struct definition ...
type CacheInvalidationJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger
CouchbaseClient *cb.Client // Our Couchbase client
JobProcessor *processing.Processor // The service doing the actual work
}
// Reconcile is triggered by CR changes or periodically.
// Its main job is to ensure the state in Couchbase reflects the CRD,
// and to trigger the processor for pending work.
func (r *CacheInvalidationJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("cacheinvalidationjob", req.NamespacedName)
// 1. Handle CRD creation/update: Create/update the corresponding Couchbase document.
var jobCR invalpha1.CacheInvalidationJob
if err := r.Get(ctx, req.NamespacedName, &jobCR); err != nil {
if errors.IsNotFound(err) {
// CR was deleted. We could add logic here to mark the Couchbase doc as "CANCELLED".
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// Translate CRD to Couchbase document and upsert it.
// The key is to make this operation idempotent.
doc := state.InvalidationJobDocument{
ID: string(jobCR.UID), // Use UID for a stable, unique ID.
Type: "invalidation-job",
Source: "CRD",
ExternalID: jobCR.Spec.InvalidationID,
TargetSelector: jobCR.Spec.TargetSelector.MatchLabels,
CacheKeys: jobCR.Spec.CacheKeys,
Status: state.StatusPending, // Always start as pending
CreatedAt: jobCR.CreationTimestamp.Time,
LastModified: time.Now().UTC(),
}
// A critical design choice: the reconciler only writes the initial state.
// The processor will own state transitions from PENDING onwards.
if err := r.CouchbaseClient.UpsertJob(doc); err != nil {
log.Error(err, "failed to sync CRD to Couchbase")
return ctrl.Result{RequeueAfter: 30 * time.Second}, err
}
// 2. Trigger the processor to look for work.
// The processor itself is what queries Couchbase for PENDING jobs.
// This makes our logic testable outside the reconciliation loop.
go r.JobProcessor.ProcessPendingJobs()
// We requeue periodically to ensure we don't miss jobs if the processor goroutine dies.
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *CacheInvalidationJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&invalpha1.CacheInvalidationJob{}).
Complete(r)
}
The diagram below illustrates the flow for both CRD and gRPC triggers.
sequenceDiagram participant Client as gRPC Client participant CRD as User (kubectl) participant Operator as Operator Pod participant Reconciler as Reconciler Loop participant gRPCServer as gRPC Server participant Processor as Job Processor participant Couchbase as Couchbase Cluster participant K8s API as Kubernetes API Server CRD->>K8s API: Apply CacheInvalidationJob CR K8s API->>Reconciler: Event: CR created Reconciler->>Couchbase: Upsert Job Document (status: PENDING) Reconciler->>Processor: Trigger job processing (async) Client->>gRPCServer: SubmitInvalidation() call gRPCServer->>Couchbase: Insert Job Document (status: PENDING) gRPCServer->>Client: Return job_id Note over Processor: Periodically, or when triggered... Processor->>Couchbase: N1QL Query for PENDING jobs Couchbase-->>Processor: Returns list of pending jobs Processor->>Processor: For each job, discover target pods Processor->>K8s API: Get Pods with label selector K8s API-->>Processor: List of Pod IPs Processor->>Couchbase: Update Job Document (status: PROCESSING, add pod list) loop For each Pod IP Processor->>TargetPod: Send invalidation request (e.g., HTTP POST) end Processor->>Couchbase: Update pod statuses in Job Document Note over Processor: Based on outcomes, update final job status to SUCCEEDED/FAILED.
Implementing the gRPC Server
The gRPC server runs in a separate goroutine within the main operator process. Its sole responsibility is to validate incoming requests and persist them to Couchbase as a PENDING
job.
internal/grpc/server.go
:
package grpc
import (
"context"
"log"
"github.com/google/uuid"
"github.com/your-org/invalidator-operator/gen/invalidator/v1"
"github.com/your-org/invalidator-operator/internal/couchbase"
"github.com/your-org/invalidator-operator/internal/state"
"google.golang.org/grpc"
"net"
"time"
)
type Server struct {
invalidatorv1.UnimplementedValidatorServiceServer
cbClient *couchbase.Client
}
func NewServer(cb *couchbase.Client) *Server {
return &Server{cbClient: cb}
}
// Start runs the gRPC server in a goroutine.
func (s *Server) Start(listenAddr string) {
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
invalidatorv1.RegisterInvalidatorServiceServer(grpcServer, s)
log.Printf("gRPC server listening at %v", lis.Addr())
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve gRPC: %v", err)
}
}()
}
func (s *Server) SubmitInvalidation(ctx context.Context, req *invalidatorv1.SubmitInvalidationRequest) (*invalidatorv1.SubmitInvalidationResponse, error) {
// Basic validation. A production system would have more robust checks.
if req.RequestId == "" || len(req.TargetSelector) == 0 || len(req.CacheKeys) == 0 {
return nil, status.Error(codes.InvalidArgument, "request_id, target_selector, and cache_keys are required")
}
jobID := uuid.New().String()
doc := state.InvalidationJobDocument{
ID: jobID,
Type: "invalidation-job",
Source: "GRPC",
ExternalID: req.RequestId,
TargetSelector: req.TargetSelector,
CacheKeys: req.CacheKeys,
Status: state.StatusPending,
CreatedAt: time.Now().UTC(),
LastModified: time.Now().UTC(),
}
if err := s.cbClient.UpsertJob(doc); err != nil {
log.Printf("ERROR: failed to persist gRPC job to Couchbase: %v", err)
return nil, status.Error(codes.Internal, "failed to schedule invalidation job")
}
return &invalidatorv1.SubmitInvalidationResponse{
JobId: jobID,
Status: "ACCEPTED",
}, nil
}
The CI/CD Pipeline with GitHub Actions
Automating the build and deployment is crucial for operational maturity. Our GitHub Actions workflow will handle building the Go binary, creating a container image, pushing it to Amazon ECR, and deploying the operator manifests to EKS.
.github/workflows/deploy.yml
:
name: Deploy Invalidator Operator
on:
push:
branches:
- main
paths:
- '**.go'
- 'Dockerfile'
- 'go.mod'
- 'go.sum'
env:
AWS_REGION: us-east-1
ECR_REPOSITORY: invalidator-operator
EKS_CLUSTER_NAME: my-production-cluster
jobs:
build-and-push:
name: Build and Push Docker Image
runs-on: ubuntu-latest
permissions:
contents: read
id-token: write # Required for OIDC authentication with AWS
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/GitHubActionsECRAccessRole
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Build the operator binary
run: make docker-buildx
- name: Build and push Docker image
id: docker_build
uses: docker/build-push-action@v4
with:
context: .
file: Dockerfile
builder: ${{ steps.build-push.outputs.builder }}
push: true
tags: ${{ steps.login-ecr.outputs.registry }}/${{ env.ECR_REPOSITORY }}:${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy-to-eks:
name: Deploy to EKS
runs-on: ubuntu-latest
needs: build-and-push
permissions:
contents: read
id-token: write # Required for OIDC authentication with AWS
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/GitHubActionsEKSAccessRole
aws-region: ${{ env.AWS_REGION }}
- name: Set up Kubeconfig
run: aws eks update-kubeconfig --name ${{ env.EKS_CLUSTER_NAME }} --region ${{ env.AWS_REGION }}
- name: Install kustomize
uses: imranismail/setup-kustomize@v2
- name: Update image tag in kustomization
run: |
cd config/manager
kustomize edit set image controller=${{ needs.build-and-push.outputs.registry }}/${{ env.ECR_REPOSITORY }}:${{ github.sha }}
- name: Deploy to EKS
run: kustomize build config/default | kubectl apply -f -
A common pitfall in these pipelines is managing credentials. The workflow above avoids long-lived IAM user keys by using OIDC to assume an IAM role, which is the recommended best practice for security.
The system is now a cohesive unit: a Kubernetes operator providing both declarative and imperative APIs for a critical middleware function, backed by a robust distributed database for state, running on managed Kubernetes, and deployed via a secure, automated pipeline.
The architecture’s primary limitation is its reliance on polling Couchbase for pending jobs. While effective and simple, this introduces a small latency floor equal to the polling interval. A more advanced iteration could leverage Couchbase’s Eventing service or a Change Data Capture (CDC) stream to trigger the job processor reactively, moving towards a truly event-driven model. Additionally, the communication from the operator to target pods is currently a simple fire-and-forget HTTP call; a production-grade system might implement a more sophisticated mechanism, possibly involving a sidecar on each target pod that establishes a persistent gRPC stream with the operator for near-instantaneous invalidation commands. The current security posture of the gRPC endpoint is also minimal; mTLS authentication between client services and the operator would be a mandatory addition before deploying to a production environment.