Managing a multi-component application like a Retrieval-Augmented Generation (RAG) pipeline using disconnected deployment scripts or Helm charts is a recipe for operational fragility. A typical RAG setup involves a vector database, a data ingestion/indexing process, and an API service for querying. These components have dependencies and a specific lifecycle order. In a real-world project, simply deploying them isn’t enough; they must be managed as a single, coherent unit that can be updated, observed, and healed declaratively. The operational burden of coordinating updates—for instance, re-indexing when source data changes—quickly becomes untenable. This is a classic state management problem that generic Kubernetes resources like Deployments and Jobs don’t solve on their own.
The initial concept was to create a higher-level abstraction that encapsulates the entire RAG pipeline’s logic. Instead of operators manually running kubectl apply
on three different manifests and then a kubectl create job
, we needed a single source of truth. The Kubernetes Operator pattern is the canonical solution for this. By defining a Custom Resource Definition (CRD), we can create a new API object, RAGPipeline
, in our cluster. An operator, which is a custom controller, will then watch for these objects and work to bring the cluster’s state in line with the spec defined in the object.
For the implementation, Go was the obvious choice for the operator itself due to its first-class support in the Kubernetes ecosystem with frameworks like Kubebuilder and Operator SDK. For the RAG components, we’ll containerize Python applications leveraging LangChain. We’ll use an in-cluster ChromaDB instance deployed as a StatefulSet
for simplicity, a Kubernetes Job
for the one-off indexing task, and a Deployment
for the stateless LangChain query API. This approach isolates responsibilities: Go for the cloud-native orchestration and Python for the NLP-specific logic. A common mistake is trying to bake too much application-specific logic into the operator; the operator’s job is orchestration, not text embedding.
Custom Resource Definition: The RAGPipeline
Contract
The foundation of the operator is the CRD. It defines the schema for our RAGPipeline
resource. The spec
section is the desired state, configured by the user, while the status
section is the observed state, written to and controlled exclusively by the operator.
Here’s the Go definition for our CRD types. The pitfall here is to make the spec either too granular or too coarse. We aim for a balance that exposes necessary configurations without overwhelming the user.
api/v1alpha1/ragpipeline_types.go
:
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// RAGPipelineSpec defines the desired state of RAGPipeline
type RAGPipelineSpec struct {
// DataSourceURL is the URL to the text document to be indexed.
// For this implementation, we assume a single plain text file.
// +kubebuilder:validation:Required
DataSourceURL string `json:"dataSourceURL"`
// EmbeddingModel specifies the name of the sentence-transformer model to use for embeddings.
// Defaults to "all-MiniLM-L6-v2".
// +kubebuilder:default:="all-MiniLM-L6-v2"
// +optional
EmbeddingModel string `json:"embeddingModel"`
// VectorStore defines the configuration for the vector store component.
// +kubebuilder:validation:Required
VectorStore VectorStoreSpec `json:"vectorStore"`
// Indexer defines the configuration for the data indexing job.
// +kubebuilder:validation:Required
Indexer IndexerSpec `json:"indexer"`
// QueryAPI defines the configuration for the RAG query service.
// +kubebuilder:validation:Required
QueryAPI QueryAPISpec `json:"queryAPI"`
}
// VectorStoreSpec defines the configuration for the vector store.
type VectorStoreSpec struct {
// Image is the container image for the ChromaDB instance.
// +kubebuilder:default:="chromadb/chroma:0.4.22"
// +optional
Image string `json:"image"`
// PersistenceSize is the size of the PVC for the vector store.
// +kubebuilder:default:="1Gi"
// +optional
PersistenceSize string `json:"persistenceSize"`
}
// IndexerSpec defines the configuration for the indexing job.
type IndexerSpec struct {
// Image is the container image for the indexer job.
// +kubebuilder:validation:Required
Image string `json:"image"`
// ChunkSize defines the size of text chunks for processing.
// +kubebuilder:default:=1000
// +optional
ChunkSize int `json:"chunkSize"`
// ChunkOverlap defines the overlap between text chunks.
// +kubebuilder:default:=100
// +optional
ChunkOverlap int `json:"chunkOverlap"`
}
// QueryAPISpec defines the configuration for the query API.
type QueryAPISpec struct {
// Image is the container image for the query API.
// +kubebuilder:validation:Required
Image string `json:"image"`
// Replicas is the number of pods for the query API.
// +kubebuilder:default:=1
// +kubebuilder:validation:Minimum=0
// +optional
Replicas *int32 `json:"replicas"`
// LLMProvider specifies the backend LLM service. For this example, 'Ollama'.
// In a real-world project, this would be a more complex struct with auth, etc.
// +kubebuilder:default:="Ollama"
// +optional
LLMProvider string `json:"llmProvider"`
// LLMServiceURL is the endpoint for the LLM service (e.g., Ollama).
// +kubebuilder:validation:Required
LLMServiceURL string `json:"llmServiceURL"`
}
// RAGPipelineStatus defines the observed state of RAGPipeline
type RAGPipelineStatus struct {
// Phase indicates the current state of the pipeline.
// Possible values: Pending, VectorStoreReady, Indexing, Ready, Error.
// +optional
Phase string `json:"phase,omitempty"`
// Conditions represent the latest available observations of the RAGPipeline's state.
// +optional
// +patchMergeKey=type
// +patchStrategy=merge
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
// APIEndpoint is the cluster-internal address of the query API service.
// +optional
APIEndpoint string `json:"apiEndpoint,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// RAGPipeline is the Schema for the ragpipelines API
type RAGPipeline struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec RAGPipelineSpec `json:"spec,omitempty"`
Status RAGPipelineStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// RAGPipelineList contains a list of RAGPipeline
type RAGPipelineList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []RAGPipeline `json:"items"`
}
func init() {
SchemeBuilder.Register(&RAGPipeline{}, &RAGPipelineList{})
}
With the CRD defined, we can start implementing the core logic in the controller’s Reconcile
function.
The Reconciliation Loop: Orchestrating the Pipeline
The Reconcile
method is the heart of the operator. It’s a level-triggered loop that is invoked whenever the RAGPipeline
resource changes, or any of its owned resources change. Our logic must be idempotent; running it multiple times with the same input should produce the same result.
The flow of our reconciliation logic follows the dependency graph of the RAG pipeline.
graph TD A[Start Reconciliation] --> B{Vector Store Exists?}; B -- No --> C[Create Vector Store StatefulSet]; C --> S[Update Status: Pending]; B -- Yes --> D{Vector Store Ready?}; D -- No --> S; D -- Yes --> E[Update Status: VectorStoreReady]; E --> F{Indexing Job Completed?}; F -- No --> G{Indexing Job Exists?}; G -- No --> H[Create Indexing Job]; H --> I[Update Status: Indexing]; G -- Yes --> I; F -- Yes --> J{API Deployment Exists?}; J -- No --> K[Create API Deployment & Service]; K --> L[Update Status: Ready]; J -- Yes --> M{Spec Changed?}; M -- Yes --> N[Update API Deployment]; N --> L; M -- No --> L; L --> Z[End Reconciliation]; I --> Z; S --> Z;
Here’s a significant portion of the controller implementation. Note the use of controllerutil.SetControllerReference
to ensure that when our RAGPipeline
CR is deleted, Kubernetes automatically garbage collects all the resources it created (the StatefulSet
, Job
, Deployment
, etc.).
internal/controller/ragpipeline_controller.go
:
package controller
import (
// ... imports
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
ragv1alpha1 "github.com/your-repo/rag-operator/api/v1alpha1"
)
const (
PhasePending = "Pending"
PhaseVectorStoreReady = "VectorStoreReady"
PhaseIndexing = "Indexing"
PhaseReady = "Ready"
PhaseError = "Error"
)
type RAGPipelineReconciler struct {
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=rag.mlops.my.domain,resources=ragpipelines,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=rag.mlops.my.domain,resources=ragpipelines/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps,resources=statefulsets;deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services;persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
func (r *RAGPipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// Fetch the RAGPipeline instance
ragPipeline := &ragv1alpha1.RAGPipeline{}
err := r.Get(ctx, req.NamespacedName, ragPipeline)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("RAGPipeline resource not found. Ignoring since object must be deleted.")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get RAGPipeline")
return ctrl.Result{}, err
}
// === Reconcile Vector Store (ChromaDB StatefulSet) ===
vectorStoreSS := &appsv1.StatefulSet{}
vectorStoreName := ragPipeline.Name + "-vector-store"
err = r.Get(ctx, types.NamespacedName{Name: vectorStoreName, Namespace: ragPipeline.Namespace}, vectorStoreSS)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new Vector Store StatefulSet")
ss := r.vectorStoreStatefulSet(ragPipeline, vectorStoreName)
if err := controllerutil.SetControllerReference(ragPipeline, ss, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, ss); err != nil {
logger.Error(err, "Failed to create new StatefulSet", "StatefulSet.Namespace", ss.Namespace, "StatefulSet.Name", ss.Name)
return ctrl.Result{}, err
}
// Update status and requeue
ragPipeline.Status.Phase = PhasePending
err = r.Status().Update(ctx, ragPipeline)
return ctrl.Result{RequeueAfter: time.Second * 5}, err
} else if err != nil {
return ctrl.Result{}, err
}
// Check if the vector store is ready
if vectorStoreSS.Status.ReadyReplicas < 1 {
logger.Info("Vector Store StatefulSet not ready yet")
ragPipeline.Status.Phase = PhasePending
err = r.Status().Update(ctx, ragPipeline)
return ctrl.Result{RequeueAfter: time.Second * 10}, err
}
if ragPipeline.Status.Phase != PhaseVectorStoreReady && ragPipeline.Status.Phase != PhaseIndexing && ragPipeline.Status.Phase != PhaseReady {
ragPipeline.Status.Phase = PhaseVectorStoreReady
if err := r.Status().Update(ctx, ragPipeline); err != nil {
return ctrl.Result{}, err
}
}
// === Reconcile Indexer (Kubernetes Job) ===
indexerJob := &batchv1.Job{}
indexerJobName := ragPipeline.Name + "-indexer"
err = r.Get(ctx, types.NamespacedName{Name: indexerJobName, Namespace: ragPipeline.Namespace}, indexerJob)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new Indexer Job")
job := r.indexerJob(ragPipeline, indexerJobName, vectorStoreName)
if err := controllerutil.SetControllerReference(ragPipeline, job, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, job); err != nil {
logger.Error(err, "Failed to create new Job", "Job.Namespace", job.Namespace, "Job.Name", job.Name)
return ctrl.Result{}, err
}
ragPipeline.Status.Phase = PhaseIndexing
err = r.Status().Update(ctx, ragPipeline)
return ctrl.Result{RequeueAfter: time.Second * 5}, err
} else if err != nil {
return ctrl.Result{}, err
}
// Check if the job is completed
if indexerJob.Status.Succeeded < 1 {
if indexerJob.Status.Failed > 0 {
logger.Error(fmt.Errorf("indexing job failed"), "Indexer job has failed", "Job.Name", indexerJobName)
ragPipeline.Status.Phase = PhaseError
err = r.Status().Update(ctx, ragPipeline)
return ctrl.Result{}, err // Do not requeue a failed job
}
logger.Info("Indexer job is still running")
ragPipeline.Status.Phase = PhaseIndexing
err = r.Status().Update(ctx, ragPipeline)
return ctrl.Result{RequeueAfter: time.Minute}, err
}
// === Reconcile Query API (Deployment and Service) ===
queryAPIDeployment := &appsv1.Deployment{}
queryAPIName := ragPipeline.Name + "-query-api"
err = r.Get(ctx, types.NamespacedName{Name: queryAPIName, Namespace: ragPipeline.Namespace}, queryAPIDeployment)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new Query API Deployment")
dep := r.queryAPIDeployment(ragPipeline, queryAPIName, vectorStoreName)
if err := controllerutil.SetControllerReference(ragPipeline, dep, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, dep); err != nil {
return ctrl.Result{}, err
}
logger.Info("Creating a new Query API Service")
svc := r.queryAPIService(ragPipeline, queryAPIName)
if err := controllerutil.SetControllerReference(ragPipeline, svc, r.Scheme); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, svc); err != nil {
return ctrl.Result{}, err
}
// Requeue to check deployment status
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
} else if err != nil {
return ctrl.Result{}, err
}
// Update the status to Ready
ragPipeline.Status.Phase = PhaseReady
ragPipeline.Status.APIEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8000", queryAPIName, ragPipeline.Namespace)
if err := r.Status().Update(ctx, ragPipeline); err != nil {
return ctrl.Result{}, err
}
logger.Info("Reconciliation finished successfully")
return ctrl.Result{}, nil
}
// ... helper functions to build Kubernetes resources ...
// (vectorStoreStatefulSet, indexerJob, queryAPIDeployment, queryAPIService)
// Helper for Vector Store StatefulSet
func (r *RAGPipelineReconciler) vectorStoreStatefulSet(cr *ragv1alpha1.RAGPipeline, name string) *appsv1.StatefulSet {
// In a real-world project, labels and annotations should be more robust
labels := map[string]string{"app": name}
pvcSize, _ := resource.ParseQuantity(cr.Spec.VectorStore.PersistenceSize)
ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: cr.Namespace,
},
Spec: appsv1.StatefulSetSpec{
ServiceName: name,
Selector: &metav1.LabelSelector{MatchLabels: labels},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "chroma",
Image: cr.Spec.VectorStore.Image,
Ports: []corev1.ContainerPort{{ContainerPort: 8000}},
VolumeMounts: []corev1.VolumeMount{{
Name: "chroma-data",
MountPath: "/chroma/chroma",
}},
}},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
ObjectMeta: metav1.ObjectMeta{Name: "chroma-data"},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: pvcSize},
},
},
}},
},
}
return ss
}
// Helper for Indexer Job
func (r *RAGPipelineReconciler) indexerJob(cr *ragv1alpha1.RAGPipeline, name, vectorStoreHost string) *batchv1.Job {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: cr.Namespace,
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "indexer",
Image: cr.Spec.Indexer.Image,
Env: []corev1.EnvVar{
{Name: "DATA_SOURCE_URL", Value: cr.Spec.DataSourceURL},
{Name: "EMBEDDING_MODEL", Value: cr.Spec.EmbeddingModel},
{Name: "CHROMA_HOST", Value: vectorStoreHost},
{Name: "CHUNK_SIZE", Value: fmt.Sprintf("%d", cr.Spec.Indexer.ChunkSize)},
{Name: "CHUNK_OVERLAP", Value: fmt.Sprintf("%d", cr.Spec.Indexer.ChunkOverlap)},
},
}},
RestartPolicy: corev1.RestartPolicyNever,
},
},
BackoffLimit: new(int32), // pointer to 0
},
}
*job.Spec.BackoffLimit = 2 // Retry twice on failure
return job
}
// SetupWithManager sets up the controller with the Manager.
func (r *RAGPipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ragv1alpha1.RAGPipeline{}).
Owns(&appsv1.StatefulSet{}).
Owns(&batchv1.Job{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Complete(r)
}
Containerized Application Logic: The Python Components
The operator orchestrates containers, but it doesn’t build them. We need two separate Python applications: one for indexing and one for the query API.
1. The Indexer Application
This is a simple script that runs to completion. It downloads data, processes it using LangChain, and loads it into the ChromaDB instance orchestrated by the operator. It gets its configuration from environment variables, which the operator injects into the Job spec.
indexer/main.py
:
import os
import requests
import logging
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader
from langchain.embeddings import HuggingFaceEmbeddings
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration from environment variables
DATA_SOURCE_URL = os.environ["DATA_SOURCE_URL"]
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
CHROMA_HOST = os.environ["CHROMA_HOST"]
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", 1000))
CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", 100))
COLLECTION_NAME = "rag_collection"
def run_indexing():
"""
Downloads data, chunks it, creates embeddings, and stores them in ChromaDB.
"""
try:
logging.info(f"Downloading data from {DATA_SOURCE_URL}")
response = requests.get(DATA_SOURCE_URL)
response.raise_for_status() # Raise an exception for bad status codes
local_path = "/tmp/source_data.txt"
with open(local_path, "w") as f:
f.write(response.text)
logging.info("Loading document...")
loader = TextLoader(local_path)
documents = loader.load()
logging.info("Splitting document into chunks...")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
docs = text_splitter.split_documents(documents)
logging.info(f"Using embedding model: {EMBEDDING_MODEL}")
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
logging.info(f"Connecting to ChromaDB at {CHROMA_HOST}:8000")
vector_store = Chroma.from_documents(
documents=docs,
embedding=embeddings,
collection_name=COLLECTION_NAME,
# Critical: This connects to the remote ChromaDB instance
client_settings={"chroma_api_impl": "rest", "chroma_server_host": CHROMA_HOST, "chroma_server_http_port": "8000"}
)
logging.info(f"Successfully indexed {len(docs)} chunks into ChromaDB collection '{COLLECTION_NAME}'.")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to download data: {e}")
raise
except Exception as e:
logging.error(f"An error occurred during indexing: {e}")
raise
if __name__ == "__main__":
run_indexing()
2. The Query API Application
This is a long-running service, typically built with a web framework like FastAPI. It exposes an endpoint that takes a user query, retrieves relevant documents from ChromaDB, and passes them along with the query to an LLM.
api/main.py
:
import os
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.chat_models import ChatOllama
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration from environment variables
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
CHROMA_HOST = os.environ["CHROMA_HOST"]
LLM_SERVICE_URL = os.environ["LLM_SERVICE_URL"]
COLLECTION_NAME = "rag_collection"
app = FastAPI()
class QueryRequest(BaseModel):
query: str
class QueryResponse(BaseModel):
answer: str
source_documents: list[dict]
# Initialize components on startup. A common mistake is to initialize these per-request,
# which is highly inefficient.
try:
logging.info("Initializing components...")
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
logging.info(f"Connecting to ChromaDB at {CHROMA_HOST}:8000")
vector_store = Chroma(
collection_name=COLLECTION_NAME,
embedding_function=embeddings,
client_settings={"chroma_api_impl": "rest", "chroma_server_host": CHROMA_HOST, "chroma_server_http_port": "8000"}
)
retriever = vector_store.as_retriever()
logging.info(f"Connecting to LLM at {LLM_SERVICE_URL}")
llm = ChatOllama(base_url=LLM_SERVICE_URL, model="llama2")
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
logging.info("Initialization complete.")
except Exception as e:
logging.error(f"Failed to initialize application: {e}")
# This will cause the pod to crash and restart, which is often the desired behavior on startup failure.
raise
@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
"""
Performs a RAG query against the indexed documents.
"""
try:
logging.info(f"Received query: '{request.query}'")
# A more robust implementation would retrieve documents first to return them.
# This simplified chain streams the answer directly.
# For production, you'd likely want to separate retrieval and generation.
answer = rag_chain.invoke(request.query)
# In a real system, you'd properly retrieve and format source docs.
# This is a placeholder.
docs = retriever.get_relevant_documents(request.query)
source_documents = [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in docs]
return QueryResponse(answer=answer, source_documents=source_documents)
except Exception as e:
logging.error(f"Error processing query '{request.query}': {e}")
raise HTTPException(status_code=500, detail="Internal server error while processing the query.")
@app.get("/healthz")
def health_check():
return {"status": "ok"}
To deploy this, you would build Docker images for the indexer and the API, push them to a registry, and then reference them in the RAGPipeline
custom resource YAML.
An example custom resource might look like this:
apiVersion: rag.mlops.my.domain/v1alpha1
kind: RAGPipeline
metadata:
name: project-gutenberg-rag
spec:
dataSourceURL: "https://www.gutenberg.org/files/1342/1342-0.txt" # Pride and Prejudice
embeddingModel: "sentence-transformers/all-MiniLM-L6-v2"
vectorStore:
image: "chromadb/chroma:0.4.22"
persistenceSize: "2Gi"
indexer:
image: "my-registry/rag-indexer:0.1.0"
chunkSize: 1200
chunkOverlap: 150
queryAPI:
image: "my-registry/rag-api:0.1.0"
replicas: 2
llmProvider: "Ollama"
llmServiceURL: "http://ollama-service.default.svc.cluster.local:11434"
Applying this single manifest would trigger the operator to stand up the entire, fully functional RAG pipeline in the correct sequence.
The current design, while functional and declarative, has clear boundaries. The in-cluster, single-pod ChromaDB is a single point of failure and not suitable for production workloads demanding high availability or large-scale vector storage. Future work must focus on abstracting the vector store to support managed services like Pinecone, Weaviate, or a clustered Milvus deployment. The indexing job is monolithic; for terabytes of source data, this should be replaced by a distributed processing workflow, which the operator could orchestrate using a tool like Argo Workflows. Lastly, the operator’s observability is limited to status phases. A production-grade implementation would expose detailed Prometheus metrics on reconciliation loops, component health, and pipeline state transition latencies, providing the necessary insight for true SRE-driven operations.