The core requirement was clear: build a real-time ingestion pipeline for a Retrieval-Augmented Generation (RAG) system. Unstructured text documents arrive on a Kafka topic, need to be converted into vector embeddings, and then indexed into ChromaDB. The non-negotiable constraint from the security team was a zero-trust network policy. All inter-service communication within our Kubernetes cluster must be mutually authenticated and encrypted using mTLS. Our platform team had already mandated Istio for this, promising it would be “transparent” to the applications. That promise turned out to be only partially true.
This is the log of that build, detailing the architectural decisions, the implementation snags, and the specific configurations required to make Kafka and ChromaDB operate correctly and securely within a strict Istio service mesh.
Initial Architecture and The Pain Point
The initial design was a standard microservices pattern, decoupled by Kafka.
graph TD A[External Document Source] -->|Produces messages| B(Kafka Topic: raw_documents) B --> C{Ingestion Service} C -->|HTTP Request| D{Embedding Service} D -->|HTTP Response with Vector| C C -->|Upsert Request| E(ChromaDB) subgraph Kubernetes Cluster with Istio C D E end
- Ingestion Service: A Python consumer that reads from the
raw_documents
Kafka topic. Its job is to orchestrate the process: fetch a message, call the Embedding Service to get a vector, and then write the result to ChromaDB. - Embedding Service: A stateless Python service with a single endpoint. It accepts text and uses a sentence-transformer model to return a dense vector embedding.
- ChromaDB: The vector database, running as a stateful service within the cluster.
The fundamental challenge wasn’t the application logic itself, but forcing this entire flow to comply with an Istio PeerAuthentication
policy set to STRICT
mode cluster-wide. This meant any service-to-service communication not using Istio’s mTLS would be rejected outright. In a real-world project, assuming new components will work out-of-the-box with a service mesh is a common and costly mistake. Kafka’s TCP-based binary protocol and ChromaDB’s HTTP/gRPC client behavior were the primary sources of friction.
Step 1: Baseline Service Implementation
First, we needed the services themselves. We used Python with FastAPI for the web services and kafka-python
for the consumer. The initial code was written without any consideration for Istio, to establish a baseline.
Here’s the initial, naive implementation of the embedding-service
.
embedding-service/app/main.py
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
import os
# --- Basic Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Environment Variable Loading ---
MODEL_NAME = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
# A pitfall here is model download time on startup. In production, this model
# should be baked into the container image or mounted from a persistent volume.
logging.info(f"Loading embedding model: {MODEL_NAME}")
try:
model = SentenceTransformer(MODEL_NAME)
logging.info("Embedding model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load sentence-transformer model: {e}")
# Fast failure is better than a zombie service.
exit(1)
# --- Application Setup ---
app = FastAPI(
title="Embedding Service",
description="A service to generate vector embeddings from text.",
)
# --- Pydantic Models for Request/Response ---
class EmbeddingRequest(BaseModel):
text: str = Field(..., min_length=1, description="The text to be embedded.")
class EmbeddingResponse(BaseModel):
embedding: list[float]
model: str = MODEL_NAME
@app.post("/embed", response_model=EmbeddingResponse)
def create_embedding(request: EmbeddingRequest):
"""
Generates a vector embedding for the provided text.
"""
try:
logging.info(f"Generating embedding for text snippet of length {len(request.text)}")
embedding = model.encode(request.text).tolist()
return EmbeddingResponse(embedding=embedding)
except Exception as e:
# Avoid leaking internal error details to the client.
logging.error(f"Error during embedding generation: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error during embedding.")
@app.get("/health")
def health_check():
"""
Simple health check endpoint.
"""
return {"status": "ok", "model_loaded": model is not None}
Next, the ingestion-service
consumer.
ingestion-service/app/consumer.py
import json
import logging
import os
import time
import requests
import chromadb
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration from Environment Variables ---
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka-service:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "raw_documents")
EMBEDDING_SVC_URL = os.getenv("EMBEDDING_SVC_URL", "http://embedding-service:8000/embed")
CHROMADB_HOST = os.getenv("CHROMADB_HOST", "chromadb-service")
CHROMADB_PORT = int(os.getenv("CHROMADB_PORT", "8000"))
COLLECTION_NAME = "document_embeddings"
# --- Robust Connection Handling ---
def connect_to_kafka_with_retry(broker: str, retries: int = 5, delay: int = 10):
for i in range(retries):
try:
logging.info(f"Attempting to connect to Kafka at {broker} (Attempt {i+1}/{retries})...")
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[broker],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='ingestion-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
logging.info("Successfully connected to Kafka.")
return consumer
except NoBrokersAvailable:
logging.warning(f"Could not connect to Kafka. Retrying in {delay} seconds...")
time.sleep(delay)
logging.error("Failed to connect to Kafka after multiple retries. Exiting.")
raise SystemExit("Kafka connection failed.")
def get_chroma_client(host: str, port: int):
logging.info(f"Connecting to ChromaDB at {host}:{port}")
client = chromadb.HttpClient(host=host, port=port)
return client
def process_message(message, chroma_collection):
doc_id = message.value.get("id")
content = message.value.get("content")
if not doc_id or not content:
logging.warning(f"Skipping malformed message: {message.value}")
return
try:
# 1. Get embedding from the service
logging.info(f"Requesting embedding for document ID: {doc_id}")
response = requests.post(EMBEDDING_SVC_URL, json={"text": content}, timeout=10)
response.raise_for_status()
embedding = response.json()['embedding']
# 2. Upsert into ChromaDB
logging.info(f"Upserting document ID: {doc_id} into ChromaDB")
chroma_collection.upsert(
embeddings=[embedding],
metadatas=[{"source": "kafka"}],
ids=[str(doc_id)]
)
logging.info(f"Successfully processed and indexed document ID: {doc_id}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to get embedding for doc {doc_id}: {e}")
# In a real system, this should trigger a retry mechanism or DLQ.
# For now, we just fail and move on.
except Exception as e:
logging.error(f"An unexpected error occurred processing doc {doc_id}: {e}", exc_info=True)
def main():
consumer = connect_to_kafka_with_retry(KAFKA_BROKER)
chroma_client = get_chroma_client(CHROMADB_HOST, CHROMADB_PORT)
# Ensure collection exists. create_collection is idempotent.
logging.info(f"Ensuring ChromaDB collection '{COLLECTION_NAME}' exists.")
collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME)
logging.info("Consumer started. Waiting for messages...")
for message in consumer:
process_message(message, collection)
if __name__ == "__main__":
main()
Step 2: Kubernetes Deployment and The First Failure
The deployment manifests were standard. We deployed Kafka and ChromaDB from Helm charts and our custom services with Deployment
and Service
objects. The critical part was enabling Istio sidecar injection.
k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: rag-pipeline
labels:
istio-injection: enabled
k8s/embedding-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: embedding-service
namespace: rag-pipeline
spec:
replicas: 2
selector:
matchLabels:
app: embedding-service
template:
metadata:
labels:
app: embedding-service
spec:
containers:
- name: embedding-service
image: your-repo/embedding-service:latest
ports:
- containerPort: 8000
env:
- name: EMBEDDING_MODEL
value: "all-MiniLM-L6-v2"
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: embedding-service
namespace: rag-pipeline
spec:
selector:
app: embedding-service
ports:
- protocol: TCP
port: 8000
targetPort: 8000
(Similar YAMLs were created for the ingestion-service
and ChromaDB).
After deploying, we enforced the global mTLS policy.
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: istio-system
spec:
mtls:
mode: STRICT
The result was immediate failure. The ingestion-service
pod logs were filled with NoBrokersAvailable
errors. The embedding-service
pods started fine, but the ingestion-service
couldn’t reach them or ChromaDB either. The logs from the Istio sidecar proxy on the consumer pod were key:[C_TIMEOUT] connect timeout for "10.42.1.15:9092"
The problem: Istio’s sidecar proxy, by default, treats all egress traffic as HTTP. When the Kafka client tried to establish its long-lived TCP connection, the proxy misinterpreted the protocol, leading to timeouts and connection resets. The client was trying to speak the Kafka binary protocol, but the proxy was expecting HTTP. This is a classic pitfall when meshing non-HTTP services.
Step 3: Teaching Istio to Speak Kafka
To fix this, we had to explicitly tell Istio how to handle Kafka traffic. The protocol is TCP, not HTTP. This is done by modifying the Kubernetes Service
object for Kafka. Istio uses the port name as a hint for protocol selection.
If your Kafka service is running within the cluster and managed by a Service
object, the fix is to name the port correctly.
k8s/kafka-service-patch.yaml
apiVersion: v1
kind: Service
metadata:
name: my-kafka-cluster-kafka-bootstrap # Name depends on your Helm release
namespace: kafka
spec:
ports:
- name: tcp-kafka # The key is the 'tcp-' prefix.
port: 9092
protocol: TCP
targetPort: 9092
By naming the port tcp-kafka
, we signal to Istio’s proxy that it should treat traffic on this port as raw TCP and not attempt to parse it as HTTP. The proxy will then just forward the TCP packets, wrapped in its mTLS tunnel, without trying to understand the application-level protocol.
After applying this change, the ingestion-service
could finally connect to Kafka. The consumer logs lit up with successful connection messages. One problem down.
Step 4: Securing HTTP and gRPC traffic with AuthorizationPolicies
Next, we addressed the failures between the ingestion-service
, embedding-service
, and chromadb-service
. Although Istio’s proxy handles HTTP traffic by default, our STRICT
mTLS policy meant that even if the protocol was understood, an explicit authorization was still required. Without an AuthorizationPolicy
, all traffic is denied by default in a zero-trust environment.
We needed to create policies based on the principle of least privilege. The ingestion-service
should be the only service allowed to call the embedding-service
.
k8s/auth-policies.yaml
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
name: allow-embedding-access
namespace: rag-pipeline
spec:
selector:
matchLabels:
app: embedding-service # This policy applies to the embedding-service pods
action: ALLOW
rules:
- from:
- source:
# Only allow requests from pods with the 'ingestion-service' identity
principals: ["cluster.local/ns/rag-pipeline/sa/ingestion-service-sa"]
to:
- operation:
methods: ["POST"]
paths: ["/embed"]
- operation:
methods: ["GET"]
paths: ["/health"]
---
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
name: allow-chromadb-access
namespace: rag-pipeline
spec:
selector:
matchLabels:
app: chromadb # This policy applies to the ChromaDB pods
action: ALLOW
rules:
- from:
- source:
# Only allow requests from the ingestion-service
principals: ["cluster.local/ns/rag-pipeline/sa/ingestion-service-sa"]
This required creating specific ServiceAccount
s for our deployments, another production best practice.
k8s/ingestion-service-deployment-with-sa.yaml
(snippet)
# ... inside the deployment spec ...
template:
spec:
serviceAccountName: ingestion-service-sa
containers:
# ...
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: ingestion-service-sa
namespace: rag-pipeline
After applying these policies, the full pipeline started working. A message produced to the Kafka topic was consumed, embedded, and indexed into ChromaDB. We verified this by checking the logs of all three services and querying ChromaDB directly. We also used istioctl
to confirm traffic was being encrypted.
istioctl proxy-status
showed all sidecars were SYNCED
.kubectl exec -it <ingestion-pod> -c istio-proxy -- openssl s_client -connect embedding-service:8000
showed the server certificate was being presented by Istio, proving the mTLS tunnel was active.
The Complete, Production-Ready Code
Here is the final state of the consumer code, now more resilient. The core logic hasn’t changed, but the surrounding structure for connection and configuration is more robust, which is essential for any real-world project.
ingestion-service/app/consumer_final.py
import json
import logging
import os
import time
from typing import Optional
import requests
import chromadb
from chromadb.api.models.Collection import Collection
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable
# --- Centralized Configuration ---
class AppConfig:
KAFKA_BROKER: str = os.getenv("KAFKA_BROKER", "my-kafka-cluster-kafka-bootstrap.kafka:9092")
KAFKA_TOPIC: str = os.getenv("KAFKA_TOPIC", "raw_documents")
KAFKA_GROUP_ID: str = os.getenv("KAFKA_GROUP_ID", "ingestion-group-v1")
EMBEDDING_SVC_URL: str = os.getenv("EMBEDDING_SVC_URL", "http://embedding-service.rag-pipeline.svc.cluster.local:8000/embed")
CHROMADB_HOST: str = os.getenv("CHROMADB_HOST", "chromadb-service.rag-pipeline.svc.cluster.local")
CHROMADB_PORT: int = int(os.getenv("CHROMADB_PORT", "8000"))
COLLECTION_NAME: str = "document_embeddings"
RETRY_COUNT: int = 5
RETRY_DELAY_SECONDS: int = 10
# --- Standardized Logging ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- Service Connector Classes ---
class KafkaConnector:
def __init__(self, config: AppConfig):
self.config = config
self.consumer: Optional[KafkaConsumer] = None
def connect(self) -> KafkaConsumer:
for i in range(self.config.RETRY_COUNT):
try:
logger.info(f"Connecting to Kafka at {self.config.KAFKA_BROKER} [Attempt {i+1}]")
self.consumer = KafkaConsumer(
self.config.KAFKA_TOPIC,
bootstrap_servers=[self.config.KAFKA_BROKER],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id=self.config.KAFKA_GROUP_ID,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=1000 # To allow periodic shutdown checks
)
logger.info("Successfully connected to Kafka.")
return self.consumer
except NoBrokersAvailable:
logger.warning(f"Kafka broker not available. Retrying in {self.config.RETRY_DELAY_SECONDS}s...")
time.sleep(self.config.RETRY_DELAY_SECONDS)
logger.critical("Could not connect to Kafka after all retries. Shutting down.")
raise SystemExit("Fatal: Kafka connection failed.")
class ChromaDBConnector:
def __init__(self, config: AppConfig):
self.config = config
self.client: Optional[chromadb.HttpClient] = None
def get_collection(self) -> Collection:
logger.info(f"Connecting to ChromaDB at {self.config.CHROMADB_HOST}:{self.config.CHROMADB_PORT}")
self.client = chromadb.HttpClient(host=self.config.CHROMADB_HOST, port=self.config.CHROMADB_PORT)
# This operation is idempotent and crucial for startup.
logger.info(f"Ensuring collection '{self.config.COLLECTION_NAME}' exists.")
return self.client.get_or_create_collection(name=self.config.COLLECTION_NAME)
# --- Main Application Logic ---
class IngestionPipeline:
def __init__(self, config: AppConfig):
self.config = config
self.http_session = requests.Session() # Use sessions for connection pooling
def get_embedding(self, text: str, doc_id: str) -> Optional[list[float]]:
try:
response = self.http_session.post(
self.config.EMBEDDING_SVC_URL,
json={"text": text},
timeout=15
)
response.raise_for_status()
return response.json()['embedding']
except requests.exceptions.RequestException as e:
logger.error(f"HTTP error getting embedding for doc {doc_id}: {e}")
return None
def upsert_to_chroma(self, collection: Collection, doc_id: str, embedding: list[float]):
try:
collection.upsert(
ids=[str(doc_id)],
embeddings=[embedding],
metadatas=[{"source": "kafka", "processed_at": int(time.time())}]
)
logger.info(f"Successfully upserted doc {doc_id} to ChromaDB.")
except Exception as e:
# This could be a transient network error or a data validation issue.
logger.error(f"Failed to upsert doc {doc_id} to ChromaDB: {e}", exc_info=True)
def run(self):
kafka_connector = KafkaConnector(self.config)
consumer = kafka_connector.connect()
chroma_connector = ChromaDBConnector(self.config)
collection = chroma_connector.get_collection()
logger.info("Pipeline started. Consuming messages...")
for message in consumer:
message_value = message.value
doc_id = message_value.get("id")
content = message_value.get("content")
if not doc_id or not content:
logger.warning(f"Skipping malformed message at offset {message.offset}: {message_value}")
continue
logger.info(f"Processing message for doc ID {doc_id} from partition {message.partition}, offset {message.offset}")
embedding = self.get_embedding(content, doc_id)
if embedding:
self.upsert_to_chroma(collection, doc_id, embedding)
else:
# A production system MUST have a DLQ strategy here.
logger.error(f"Skipping doc {doc_id} due to embedding failure.")
def main():
config = AppConfig()
pipeline = IngestionPipeline(config)
pipeline.run()
if __name__ == "__main__":
main()
Lingering Issues and Future Work
This implementation establishes a secure and functional baseline, but it is far from a complete production system. The current design has several limitations that would need to be addressed in the next iteration. The error handling in the ingestion-service
is naive; a message that repeatedly fails embedding or insertion will be retried indefinitely, blocking the partition. A robust dead-letter queue (DLQ) mechanism is not optional, it’s a requirement to prevent pipeline stalls. Furthermore, the embedding-service
is a potential bottleneck; it should be scaled horizontally based on request latency or queue depth, and Istio’s load balancing capabilities should be fine-tuned. Finally, while we have security, we lack deep observability. Instrumenting the applications with OpenTelemetry to generate traces would allow us to correlate a slow ChromaDB upsert with a specific Kafka message, providing invaluable debugging context that Istio metrics alone cannot offer.