Implementing a Kubernetes Operator for Declarative Management of LangChain RAG Pipelines


Managing a multi-component application like a Retrieval-Augmented Generation (RAG) pipeline using disconnected deployment scripts or Helm charts is a recipe for operational fragility. A typical RAG setup involves a vector database, a data ingestion/indexing process, and an API service for querying. These components have dependencies and a specific lifecycle order. In a real-world project, simply deploying them isn’t enough; they must be managed as a single, coherent unit that can be updated, observed, and healed declaratively. The operational burden of coordinating updates—for instance, re-indexing when source data changes—quickly becomes untenable. This is a classic state management problem that generic Kubernetes resources like Deployments and Jobs don’t solve on their own.

The initial concept was to create a higher-level abstraction that encapsulates the entire RAG pipeline’s logic. Instead of operators manually running kubectl apply on three different manifests and then a kubectl create job, we needed a single source of truth. The Kubernetes Operator pattern is the canonical solution for this. By defining a Custom Resource Definition (CRD), we can create a new API object, RAGPipeline, in our cluster. An operator, which is a custom controller, will then watch for these objects and work to bring the cluster’s state in line with the spec defined in the object.

For the implementation, Go was the obvious choice for the operator itself due to its first-class support in the Kubernetes ecosystem with frameworks like Kubebuilder and Operator SDK. For the RAG components, we’ll containerize Python applications leveraging LangChain. We’ll use an in-cluster ChromaDB instance deployed as a StatefulSet for simplicity, a Kubernetes Job for the one-off indexing task, and a Deployment for the stateless LangChain query API. This approach isolates responsibilities: Go for the cloud-native orchestration and Python for the NLP-specific logic. A common mistake is trying to bake too much application-specific logic into the operator; the operator’s job is orchestration, not text embedding.

Custom Resource Definition: The RAGPipeline Contract

The foundation of the operator is the CRD. It defines the schema for our RAGPipeline resource. The spec section is the desired state, configured by the user, while the status section is the observed state, written to and controlled exclusively by the operator.

Here’s the Go definition for our CRD types. The pitfall here is to make the spec either too granular or too coarse. We aim for a balance that exposes necessary configurations without overwhelming the user.

api/v1alpha1/ragpipeline_types.go:

package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// RAGPipelineSpec defines the desired state of RAGPipeline
type RAGPipelineSpec struct {
	// DataSourceURL is the URL to the text document to be indexed.
	// For this implementation, we assume a single plain text file.
	// +kubebuilder:validation:Required
	DataSourceURL string `json:"dataSourceURL"`

	// EmbeddingModel specifies the name of the sentence-transformer model to use for embeddings.
	// Defaults to "all-MiniLM-L6-v2".
	// +kubebuilder:default:="all-MiniLM-L6-v2"
	// +optional
	EmbeddingModel string `json:"embeddingModel"`

	// VectorStore defines the configuration for the vector store component.
	// +kubebuilder:validation:Required
	VectorStore VectorStoreSpec `json:"vectorStore"`

	// Indexer defines the configuration for the data indexing job.
	// +kubebuilder:validation:Required
	Indexer IndexerSpec `json:"indexer"`

	// QueryAPI defines the configuration for the RAG query service.
	// +kubebuilder:validation:Required
	QueryAPI QueryAPISpec `json:"queryAPI"`
}

// VectorStoreSpec defines the configuration for the vector store.
type VectorStoreSpec struct {
	// Image is the container image for the ChromaDB instance.
	// +kubebuilder:default:="chromadb/chroma:0.4.22"
	// +optional
	Image string `json:"image"`
	// PersistenceSize is the size of the PVC for the vector store.
	// +kubebuilder:default:="1Gi"
	// +optional
	PersistenceSize string `json:"persistenceSize"`
}

// IndexerSpec defines the configuration for the indexing job.
type IndexerSpec struct {
	// Image is the container image for the indexer job.
	// +kubebuilder:validation:Required
	Image string `json:"image"`
	// ChunkSize defines the size of text chunks for processing.
	// +kubebuilder:default:=1000
	// +optional
	ChunkSize int `json:"chunkSize"`
	// ChunkOverlap defines the overlap between text chunks.
	// +kubebuilder:default:=100
	// +optional
	ChunkOverlap int `json:"chunkOverlap"`
}

// QueryAPISpec defines the configuration for the query API.
type QueryAPISpec struct {
	// Image is the container image for the query API.
	// +kubebuilder:validation:Required
	Image string `json:"image"`
	// Replicas is the number of pods for the query API.
	// +kubebuilder:default:=1
	// +kubebuilder:validation:Minimum=0
	// +optional
	Replicas *int32 `json:"replicas"`
	// LLMProvider specifies the backend LLM service. For this example, 'Ollama'.
	// In a real-world project, this would be a more complex struct with auth, etc.
	// +kubebuilder:default:="Ollama"
	// +optional
	LLMProvider string `json:"llmProvider"`
	// LLMServiceURL is the endpoint for the LLM service (e.g., Ollama).
	// +kubebuilder:validation:Required
	LLMServiceURL string `json:"llmServiceURL"`
}


// RAGPipelineStatus defines the observed state of RAGPipeline
type RAGPipelineStatus struct {
	// Phase indicates the current state of the pipeline.
	// Possible values: Pending, VectorStoreReady, Indexing, Ready, Error.
	// +optional
	Phase string `json:"phase,omitempty"`

	// Conditions represent the latest available observations of the RAGPipeline's state.
	// +optional
	// +patchMergeKey=type
	// +patchStrategy=merge
	Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
	
	// APIEndpoint is the cluster-internal address of the query API service.
	// +optional
	APIEndpoint string `json:"apiEndpoint,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

// RAGPipeline is the Schema for the ragpipelines API
type RAGPipeline struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   RAGPipelineSpec   `json:"spec,omitempty"`
	Status RAGPipelineStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// RAGPipelineList contains a list of RAGPipeline
type RAGPipelineList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []RAGPipeline `json:"items"`
}

func init() {
	SchemeBuilder.Register(&RAGPipeline{}, &RAGPipelineList{})
}

With the CRD defined, we can start implementing the core logic in the controller’s Reconcile function.

The Reconciliation Loop: Orchestrating the Pipeline

The Reconcile method is the heart of the operator. It’s a level-triggered loop that is invoked whenever the RAGPipeline resource changes, or any of its owned resources change. Our logic must be idempotent; running it multiple times with the same input should produce the same result.

The flow of our reconciliation logic follows the dependency graph of the RAG pipeline.

graph TD
    A[Start Reconciliation] --> B{Vector Store Exists?};
    B -- No --> C[Create Vector Store StatefulSet];
    C --> S[Update Status: Pending];
    B -- Yes --> D{Vector Store Ready?};
    D -- No --> S;
    D -- Yes --> E[Update Status: VectorStoreReady];
    E --> F{Indexing Job Completed?};
    F -- No --> G{Indexing Job Exists?};
    G -- No --> H[Create Indexing Job];
    H --> I[Update Status: Indexing];
    G -- Yes --> I;
    F -- Yes --> J{API Deployment Exists?};
    J -- No --> K[Create API Deployment & Service];
    K --> L[Update Status: Ready];
    J -- Yes --> M{Spec Changed?};
    M -- Yes --> N[Update API Deployment];
    N --> L;
    M -- No --> L;
    L --> Z[End Reconciliation];
    I --> Z;
    S --> Z;

Here’s a significant portion of the controller implementation. Note the use of controllerutil.SetControllerReference to ensure that when our RAGPipeline CR is deleted, Kubernetes automatically garbage collects all the resources it created (the StatefulSet, Job, Deployment, etc.).

internal/controller/ragpipeline_controller.go:

package controller

import (
	// ... imports
	"context"
	"fmt"
	"time"

	appsv1 "k8s.io/api/apps/v1"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
	"sigs.k8s.io/controller-runtime/pkg/log"

	ragv1alpha1 "github.com/your-repo/rag-operator/api/v1alpha1"
)

const (
	PhasePending          = "Pending"
	PhaseVectorStoreReady = "VectorStoreReady"
	PhaseIndexing         = "Indexing"
	PhaseReady            = "Ready"
	PhaseError            = "Error"
)

type RAGPipelineReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=rag.mlops.my.domain,resources=ragpipelines,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=rag.mlops.my.domain,resources=ragpipelines/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps,resources=statefulsets;deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services;persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete

func (r *RAGPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)

	// Fetch the RAGPipeline instance
	ragPipeline := &ragv1alpha1.RAGPipeline{}
	err := r.Get(ctx, req.NamespacedName, ragPipeline)
	if err != nil {
		if errors.IsNotFound(err) {
			logger.Info("RAGPipeline resource not found. Ignoring since object must be deleted.")
			return ctrl.Result{}, nil
		}
		logger.Error(err, "Failed to get RAGPipeline")
		return ctrl.Result{}, err
	}
	
	// === Reconcile Vector Store (ChromaDB StatefulSet) ===
	vectorStoreSS := &appsv1.StatefulSet{}
	vectorStoreName := ragPipeline.Name + "-vector-store"
	err = r.Get(ctx, types.NamespacedName{Name: vectorStoreName, Namespace: ragPipeline.Namespace}, vectorStoreSS)
	if err != nil && errors.IsNotFound(err) {
		logger.Info("Creating a new Vector Store StatefulSet")
		ss := r.vectorStoreStatefulSet(ragPipeline, vectorStoreName)
		if err := controllerutil.SetControllerReference(ragPipeline, ss, r.Scheme); err != nil {
			return ctrl.Result{}, err
		}
		if err := r.Create(ctx, ss); err != nil {
			logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", ss.Namespace, "StatefulSet.Name", ss.Name)
			return ctrl.Result{}, err
		}
		// Update status and requeue
		ragPipeline.Status.Phase = PhasePending
		err = r.Status().Update(ctx, ragPipeline)
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	} else if err != nil {
		return ctrl.Result{}, err
	}

	// Check if the vector store is ready
	if vectorStoreSS.Status.ReadyReplicas < 1 {
		logger.Info("Vector Store StatefulSet not ready yet")
		ragPipeline.Status.Phase = PhasePending
		err = r.Status().Update(ctx, ragPipeline)
		return ctrl.Result{RequeueAfter: time.Second * 10}, err
	}

	if ragPipeline.Status.Phase != PhaseVectorStoreReady && ragPipeline.Status.Phase != PhaseIndexing && ragPipeline.Status.Phase != PhaseReady {
		ragPipeline.Status.Phase = PhaseVectorStoreReady
		if err := r.Status().Update(ctx, ragPipeline); err != nil {
			return ctrl.Result{}, err
		}
	}


	// === Reconcile Indexer (Kubernetes Job) ===
	indexerJob := &batchv1.Job{}
	indexerJobName := ragPipeline.Name + "-indexer"
	err = r.Get(ctx, types.NamespacedName{Name: indexerJobName, Namespace: ragPipeline.Namespace}, indexerJob)
	if err != nil && errors.IsNotFound(err) {
		logger.Info("Creating a new Indexer Job")
		job := r.indexerJob(ragPipeline, indexerJobName, vectorStoreName)
		if err := controllerutil.SetControllerReference(ragPipeline, job, r.Scheme); err != nil {
			return ctrl.Result{}, err
		}
		if err := r.Create(ctx, job); err != nil {
			logger.Error(err, "Failed to create new Job", "Job.Namespace", job.Namespace, "Job.Name", job.Name)
			return ctrl.Result{}, err
		}
		ragPipeline.Status.Phase = PhaseIndexing
		err = r.Status().Update(ctx, ragPipeline)
		return ctrl.Result{RequeueAfter: time.Second * 5}, err
	} else if err != nil {
		return ctrl.Result{}, err
	}
	
	// Check if the job is completed
	if indexerJob.Status.Succeeded < 1 {
		if indexerJob.Status.Failed > 0 {
			logger.Error(fmt.Errorf("indexing job failed"), "Indexer job has failed", "Job.Name", indexerJobName)
			ragPipeline.Status.Phase = PhaseError
			err = r.Status().Update(ctx, ragPipeline)
			return ctrl.Result{}, err // Do not requeue a failed job
		}
		logger.Info("Indexer job is still running")
		ragPipeline.Status.Phase = PhaseIndexing
		err = r.Status().Update(ctx, ragPipeline)
		return ctrl.Result{RequeueAfter: time.Minute}, err
	}


	// === Reconcile Query API (Deployment and Service) ===
	queryAPIDeployment := &appsv1.Deployment{}
	queryAPIName := ragPipeline.Name + "-query-api"
	err = r.Get(ctx, types.NamespacedName{Name: queryAPIName, Namespace: ragPipeline.Namespace}, queryAPIDeployment)
	if err != nil && errors.IsNotFound(err) {
		logger.Info("Creating a new Query API Deployment")
		dep := r.queryAPIDeployment(ragPipeline, queryAPIName, vectorStoreName)
		if err := controllerutil.SetControllerReference(ragPipeline, dep, r.Scheme); err != nil {
			return ctrl.Result{}, err
		}
		if err := r.Create(ctx, dep); err != nil {
			return ctrl.Result{}, err
		}
		
		logger.Info("Creating a new Query API Service")
		svc := r.queryAPIService(ragPipeline, queryAPIName)
		if err := controllerutil.SetControllerReference(ragPipeline, svc, r.Scheme); err != nil {
			return ctrl.Result{}, err
		}
		if err := r.Create(ctx, svc); err != nil {
			return ctrl.Result{}, err
		}

		// Requeue to check deployment status
		return ctrl.Result{RequeueAfter: time.Second * 5}, nil
	} else if err != nil {
		return ctrl.Result{}, err
	}

	// Update the status to Ready
	ragPipeline.Status.Phase = PhaseReady
	ragPipeline.Status.APIEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8000", queryAPIName, ragPipeline.Namespace)
	if err := r.Status().Update(ctx, ragPipeline); err != nil {
		return ctrl.Result{}, err
	}

	logger.Info("Reconciliation finished successfully")
	return ctrl.Result{}, nil
}

// ... helper functions to build Kubernetes resources ...
// (vectorStoreStatefulSet, indexerJob, queryAPIDeployment, queryAPIService)

// Helper for Vector Store StatefulSet
func (r *RAGPipelineReconciler) vectorStoreStatefulSet(cr *ragv1alpha1.RAGPipeline, name string) *appsv1.StatefulSet {
	// In a real-world project, labels and annotations should be more robust
	labels := map[string]string{"app": name}
	pvcSize, _ := resource.ParseQuantity(cr.Spec.VectorStore.PersistenceSize)

	ss := &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      name,
			Namespace: cr.Namespace,
		},
		Spec: appsv1.StatefulSetSpec{
			ServiceName: name,
			Selector:    &metav1.LabelSelector{MatchLabels: labels},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{Labels: labels},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{{
						Name:  "chroma",
						Image: cr.Spec.VectorStore.Image,
						Ports: []corev1.ContainerPort{{ContainerPort: 8000}},
						VolumeMounts: []corev1.VolumeMount{{
							Name:      "chroma-data",
							MountPath: "/chroma/chroma",
						}},
					}},
				},
			},
			VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
				ObjectMeta: metav1.ObjectMeta{Name: "chroma-data"},
				Spec: corev1.PersistentVolumeClaimSpec{
					AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
					Resources: corev1.ResourceRequirements{
						Requests: corev1.ResourceList{corev1.ResourceStorage: pvcSize},
					},
				},
			}},
		},
	}
	return ss
}

// Helper for Indexer Job
func (r *RAGPipelineReconciler) indexerJob(cr *ragv1alpha1.RAGPipeline, name, vectorStoreHost string) *batchv1.Job {
	job := &batchv1.Job{
		ObjectMeta: metav1.ObjectMeta{
			Name:      name,
			Namespace: cr.Namespace,
		},
		Spec: batchv1.JobSpec{
			Template: corev1.PodTemplateSpec{
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{{
						Name:  "indexer",
						Image: cr.Spec.Indexer.Image,
						Env: []corev1.EnvVar{
							{Name: "DATA_SOURCE_URL", Value: cr.Spec.DataSourceURL},
							{Name: "EMBEDDING_MODEL", Value: cr.Spec.EmbeddingModel},
							{Name: "CHROMA_HOST", Value: vectorStoreHost},
							{Name: "CHUNK_SIZE", Value: fmt.Sprintf("%d", cr.Spec.Indexer.ChunkSize)},
							{Name: "CHUNK_OVERLAP", Value: fmt.Sprintf("%d", cr.Spec.Indexer.ChunkOverlap)},
						},
					}},
					RestartPolicy: corev1.RestartPolicyNever,
				},
			},
			BackoffLimit: new(int32), // pointer to 0
		},
	}
	*job.Spec.BackoffLimit = 2 // Retry twice on failure
	return job
}
// SetupWithManager sets up the controller with the Manager.
func (r *RAGPipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&ragv1alpha1.RAGPipeline{}).
		Owns(&appsv1.StatefulSet{}).
		Owns(&batchv1.Job{}).
		Owns(&appsv1.Deployment{}).
		Owns(&corev1.Service{}).
		Complete(r)
}

Containerized Application Logic: The Python Components

The operator orchestrates containers, but it doesn’t build them. We need two separate Python applications: one for indexing and one for the query API.

1. The Indexer Application

This is a simple script that runs to completion. It downloads data, processes it using LangChain, and loads it into the ChromaDB instance orchestrated by the operator. It gets its configuration from environment variables, which the operator injects into the Job spec.

indexer/main.py:

import os
import requests
import logging

from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader
from langchain.embeddings import HuggingFaceEmbeddings

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Configuration from environment variables
DATA_SOURCE_URL = os.environ["DATA_SOURCE_URL"]
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
CHROMA_HOST = os.environ["CHROMA_HOST"]
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", 1000))
CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", 100))
COLLECTION_NAME = "rag_collection"

def run_indexing():
    """
    Downloads data, chunks it, creates embeddings, and stores them in ChromaDB.
    """
    try:
        logging.info(f"Downloading data from {DATA_SOURCE_URL}")
        response = requests.get(DATA_SOURCE_URL)
        response.raise_for_status() # Raise an exception for bad status codes
        
        local_path = "/tmp/source_data.txt"
        with open(local_path, "w") as f:
            f.write(response.text)

        logging.info("Loading document...")
        loader = TextLoader(local_path)
        documents = loader.load()
        
        logging.info("Splitting document into chunks...")
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
        docs = text_splitter.split_documents(documents)
        
        logging.info(f"Using embedding model: {EMBEDDING_MODEL}")
        embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
        
        logging.info(f"Connecting to ChromaDB at {CHROMA_HOST}:8000")
        vector_store = Chroma.from_documents(
            documents=docs, 
            embedding=embeddings, 
            collection_name=COLLECTION_NAME,
            # Critical: This connects to the remote ChromaDB instance
            client_settings={"chroma_api_impl": "rest", "chroma_server_host": CHROMA_HOST, "chroma_server_http_port": "8000"}
        )
        
        logging.info(f"Successfully indexed {len(docs)} chunks into ChromaDB collection '{COLLECTION_NAME}'.")

    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to download data: {e}")
        raise
    except Exception as e:
        logging.error(f"An error occurred during indexing: {e}")
        raise

if __name__ == "__main__":
    run_indexing()

2. The Query API Application

This is a long-running service, typically built with a web framework like FastAPI. It exposes an endpoint that takes a user query, retrieves relevant documents from ChromaDB, and passes them along with the query to an LLM.

api/main.py:

import os
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.chat_models import ChatOllama
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Configuration from environment variables
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
CHROMA_HOST = os.environ["CHROMA_HOST"]
LLM_SERVICE_URL = os.environ["LLM_SERVICE_URL"]
COLLECTION_NAME = "rag_collection"

app = FastAPI()

class QueryRequest(BaseModel):
    query: str

class QueryResponse(BaseModel):
    answer: str
    source_documents: list[dict]

# Initialize components on startup. A common mistake is to initialize these per-request,
# which is highly inefficient.
try:
    logging.info("Initializing components...")
    embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    
    logging.info(f"Connecting to ChromaDB at {CHROMA_HOST}:8000")
    vector_store = Chroma(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        client_settings={"chroma_api_impl": "rest", "chroma_server_host": CHROMA_HOST, "chroma_server_http_port": "8000"}
    )
    retriever = vector_store.as_retriever()
    
    logging.info(f"Connecting to LLM at {LLM_SERVICE_URL}")
    llm = ChatOllama(base_url=LLM_SERVICE_URL, model="llama2")

    template = """Answer the question based only on the following context:
    {context}

    Question: {question}
    """
    prompt = ChatPromptTemplate.from_template(template)

    rag_chain = (
        {"context": retriever, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )
    logging.info("Initialization complete.")
except Exception as e:
    logging.error(f"Failed to initialize application: {e}")
    # This will cause the pod to crash and restart, which is often the desired behavior on startup failure.
    raise

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    """
    Performs a RAG query against the indexed documents.
    """
    try:
        logging.info(f"Received query: '{request.query}'")
        
        # A more robust implementation would retrieve documents first to return them.
        # This simplified chain streams the answer directly.
        # For production, you'd likely want to separate retrieval and generation.
        answer = rag_chain.invoke(request.query)
        
        # In a real system, you'd properly retrieve and format source docs.
        # This is a placeholder.
        docs = retriever.get_relevant_documents(request.query)
        source_documents = [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in docs]

        return QueryResponse(answer=answer, source_documents=source_documents)
    except Exception as e:
        logging.error(f"Error processing query '{request.query}': {e}")
        raise HTTPException(status_code=500, detail="Internal server error while processing the query.")

@app.get("/healthz")
def health_check():
    return {"status": "ok"}

To deploy this, you would build Docker images for the indexer and the API, push them to a registry, and then reference them in the RAGPipeline custom resource YAML.

An example custom resource might look like this:

apiVersion: rag.mlops.my.domain/v1alpha1
kind: RAGPipeline
metadata:
  name: project-gutenberg-rag
spec:
  dataSourceURL: "https://www.gutenberg.org/files/1342/1342-0.txt" # Pride and Prejudice
  embeddingModel: "sentence-transformers/all-MiniLM-L6-v2"
  vectorStore:
    image: "chromadb/chroma:0.4.22"
    persistenceSize: "2Gi"
  indexer:
    image: "my-registry/rag-indexer:0.1.0"
    chunkSize: 1200
    chunkOverlap: 150
  queryAPI:
    image: "my-registry/rag-api:0.1.0"
    replicas: 2
    llmProvider: "Ollama"
    llmServiceURL: "http://ollama-service.default.svc.cluster.local:11434"

Applying this single manifest would trigger the operator to stand up the entire, fully functional RAG pipeline in the correct sequence.

The current design, while functional and declarative, has clear boundaries. The in-cluster, single-pod ChromaDB is a single point of failure and not suitable for production workloads demanding high availability or large-scale vector storage. Future work must focus on abstracting the vector store to support managed services like Pinecone, Weaviate, or a clustered Milvus deployment. The indexing job is monolithic; for terabytes of source data, this should be replaced by a distributed processing workflow, which the operator could orchestrate using a tool like Argo Workflows. Lastly, the operator’s observability is limited to status phases. A production-grade implementation would expose detailed Prometheus metrics on reconciliation loops, component health, and pipeline state transition latencies, providing the necessary insight for true SRE-driven operations.


  TOC