Implementing a Secure Real-Time Vector Ingestion Pipeline Using Kafka, ChromaDB, and Istio


The core requirement was clear: build a real-time ingestion pipeline for a Retrieval-Augmented Generation (RAG) system. Unstructured text documents arrive on a Kafka topic, need to be converted into vector embeddings, and then indexed into ChromaDB. The non-negotiable constraint from the security team was a zero-trust network policy. All inter-service communication within our Kubernetes cluster must be mutually authenticated and encrypted using mTLS. Our platform team had already mandated Istio for this, promising it would be “transparent” to the applications. That promise turned out to be only partially true.

This is the log of that build, detailing the architectural decisions, the implementation snags, and the specific configurations required to make Kafka and ChromaDB operate correctly and securely within a strict Istio service mesh.

Initial Architecture and The Pain Point

The initial design was a standard microservices pattern, decoupled by Kafka.

graph TD
    A[External Document Source] -->|Produces messages| B(Kafka Topic: raw_documents)
    B --> C{Ingestion Service}
    C -->|HTTP Request| D{Embedding Service}
    D -->|HTTP Response with Vector| C
    C -->|Upsert Request| E(ChromaDB)

    subgraph Kubernetes Cluster with Istio
        C
        D
        E
    end
  1. Ingestion Service: A Python consumer that reads from the raw_documents Kafka topic. Its job is to orchestrate the process: fetch a message, call the Embedding Service to get a vector, and then write the result to ChromaDB.
  2. Embedding Service: A stateless Python service with a single endpoint. It accepts text and uses a sentence-transformer model to return a dense vector embedding.
  3. ChromaDB: The vector database, running as a stateful service within the cluster.

The fundamental challenge wasn’t the application logic itself, but forcing this entire flow to comply with an Istio PeerAuthentication policy set to STRICT mode cluster-wide. This meant any service-to-service communication not using Istio’s mTLS would be rejected outright. In a real-world project, assuming new components will work out-of-the-box with a service mesh is a common and costly mistake. Kafka’s TCP-based binary protocol and ChromaDB’s HTTP/gRPC client behavior were the primary sources of friction.

Step 1: Baseline Service Implementation

First, we needed the services themselves. We used Python with FastAPI for the web services and kafka-python for the consumer. The initial code was written without any consideration for Istio, to establish a baseline.

Here’s the initial, naive implementation of the embedding-service.

embedding-service/app/main.py

import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from sentence_transformers import SentenceTransformer
import os

# --- Basic Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Environment Variable Loading ---
MODEL_NAME = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2")
# A pitfall here is model download time on startup. In production, this model
# should be baked into the container image or mounted from a persistent volume.
logging.info(f"Loading embedding model: {MODEL_NAME}")
try:
    model = SentenceTransformer(MODEL_NAME)
    logging.info("Embedding model loaded successfully.")
except Exception as e:
    logging.error(f"Failed to load sentence-transformer model: {e}")
    # Fast failure is better than a zombie service.
    exit(1)

# --- Application Setup ---
app = FastAPI(
    title="Embedding Service",
    description="A service to generate vector embeddings from text.",
)

# --- Pydantic Models for Request/Response ---
class EmbeddingRequest(BaseModel):
    text: str = Field(..., min_length=1, description="The text to be embedded.")

class EmbeddingResponse(BaseModel):
    embedding: list[float]
    model: str = MODEL_NAME

@app.post("/embed", response_model=EmbeddingResponse)
def create_embedding(request: EmbeddingRequest):
    """
    Generates a vector embedding for the provided text.
    """
    try:
        logging.info(f"Generating embedding for text snippet of length {len(request.text)}")
        embedding = model.encode(request.text).tolist()
        return EmbeddingResponse(embedding=embedding)
    except Exception as e:
        # Avoid leaking internal error details to the client.
        logging.error(f"Error during embedding generation: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error during embedding.")

@app.get("/health")
def health_check():
    """
    Simple health check endpoint.
    """
    return {"status": "ok", "model_loaded": model is not None}

Next, the ingestion-service consumer.

ingestion-service/app/consumer.py

import json
import logging
import os
import time
import requests
import chromadb
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Configuration from Environment Variables ---
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "kafka-service:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "raw_documents")
EMBEDDING_SVC_URL = os.getenv("EMBEDDING_SVC_URL", "http://embedding-service:8000/embed")
CHROMADB_HOST = os.getenv("CHROMADB_HOST", "chromadb-service")
CHROMADB_PORT = int(os.getenv("CHROMADB_PORT", "8000"))
COLLECTION_NAME = "document_embeddings"

# --- Robust Connection Handling ---
def connect_to_kafka_with_retry(broker: str, retries: int = 5, delay: int = 10):
    for i in range(retries):
        try:
            logging.info(f"Attempting to connect to Kafka at {broker} (Attempt {i+1}/{retries})...")
            consumer = KafkaConsumer(
                KAFKA_TOPIC,
                bootstrap_servers=[broker],
                auto_offset_reset='earliest',
                enable_auto_commit=True,
                group_id='ingestion-group',
                value_deserializer=lambda x: json.loads(x.decode('utf-8'))
            )
            logging.info("Successfully connected to Kafka.")
            return consumer
        except NoBrokersAvailable:
            logging.warning(f"Could not connect to Kafka. Retrying in {delay} seconds...")
            time.sleep(delay)
    logging.error("Failed to connect to Kafka after multiple retries. Exiting.")
    raise SystemExit("Kafka connection failed.")

def get_chroma_client(host: str, port: int):
    logging.info(f"Connecting to ChromaDB at {host}:{port}")
    client = chromadb.HttpClient(host=host, port=port)
    return client

def process_message(message, chroma_collection):
    doc_id = message.value.get("id")
    content = message.value.get("content")

    if not doc_id or not content:
        logging.warning(f"Skipping malformed message: {message.value}")
        return

    try:
        # 1. Get embedding from the service
        logging.info(f"Requesting embedding for document ID: {doc_id}")
        response = requests.post(EMBEDDING_SVC_URL, json={"text": content}, timeout=10)
        response.raise_for_status()
        embedding = response.json()['embedding']
        
        # 2. Upsert into ChromaDB
        logging.info(f"Upserting document ID: {doc_id} into ChromaDB")
        chroma_collection.upsert(
            embeddings=[embedding],
            metadatas=[{"source": "kafka"}],
            ids=[str(doc_id)]
        )
        logging.info(f"Successfully processed and indexed document ID: {doc_id}")

    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to get embedding for doc {doc_id}: {e}")
        # In a real system, this should trigger a retry mechanism or DLQ.
        # For now, we just fail and move on.
    except Exception as e:
        logging.error(f"An unexpected error occurred processing doc {doc_id}: {e}", exc_info=True)


def main():
    consumer = connect_to_kafka_with_retry(KAFKA_BROKER)
    chroma_client = get_chroma_client(CHROMADB_HOST, CHROMADB_PORT)
    
    # Ensure collection exists. create_collection is idempotent.
    logging.info(f"Ensuring ChromaDB collection '{COLLECTION_NAME}' exists.")
    collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME)

    logging.info("Consumer started. Waiting for messages...")
    for message in consumer:
        process_message(message, collection)

if __name__ == "__main__":
    main()

Step 2: Kubernetes Deployment and The First Failure

The deployment manifests were standard. We deployed Kafka and ChromaDB from Helm charts and our custom services with Deployment and Service objects. The critical part was enabling Istio sidecar injection.

k8s/namespace.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: rag-pipeline
  labels:
    istio-injection: enabled

k8s/embedding-service-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: embedding-service
  namespace: rag-pipeline
spec:
  replicas: 2
  selector:
    matchLabels:
      app: embedding-service
  template:
    metadata:
      labels:
        app: embedding-service
    spec:
      containers:
      - name: embedding-service
        image: your-repo/embedding-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: EMBEDDING_MODEL
          value: "all-MiniLM-L6-v2"
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "1"
            memory: "2Gi"
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 15
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: embedding-service
  namespace: rag-pipeline
spec:
  selector:
    app: embedding-service
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

(Similar YAMLs were created for the ingestion-service and ChromaDB).

After deploying, we enforced the global mTLS policy.

apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: istio-system
spec:
  mtls:
    mode: STRICT

The result was immediate failure. The ingestion-service pod logs were filled with NoBrokersAvailable errors. The embedding-service pods started fine, but the ingestion-service couldn’t reach them or ChromaDB either. The logs from the Istio sidecar proxy on the consumer pod were key:
[C_TIMEOUT] connect timeout for "10.42.1.15:9092"

The problem: Istio’s sidecar proxy, by default, treats all egress traffic as HTTP. When the Kafka client tried to establish its long-lived TCP connection, the proxy misinterpreted the protocol, leading to timeouts and connection resets. The client was trying to speak the Kafka binary protocol, but the proxy was expecting HTTP. This is a classic pitfall when meshing non-HTTP services.

Step 3: Teaching Istio to Speak Kafka

To fix this, we had to explicitly tell Istio how to handle Kafka traffic. The protocol is TCP, not HTTP. This is done by modifying the Kubernetes Service object for Kafka. Istio uses the port name as a hint for protocol selection.

If your Kafka service is running within the cluster and managed by a Service object, the fix is to name the port correctly.

k8s/kafka-service-patch.yaml

apiVersion: v1
kind: Service
metadata:
  name: my-kafka-cluster-kafka-bootstrap # Name depends on your Helm release
  namespace: kafka
spec:
  ports:
    - name: tcp-kafka # The key is the 'tcp-' prefix.
      port: 9092
      protocol: TCP
      targetPort: 9092

By naming the port tcp-kafka, we signal to Istio’s proxy that it should treat traffic on this port as raw TCP and not attempt to parse it as HTTP. The proxy will then just forward the TCP packets, wrapped in its mTLS tunnel, without trying to understand the application-level protocol.

After applying this change, the ingestion-service could finally connect to Kafka. The consumer logs lit up with successful connection messages. One problem down.

Step 4: Securing HTTP and gRPC traffic with AuthorizationPolicies

Next, we addressed the failures between the ingestion-service, embedding-service, and chromadb-service. Although Istio’s proxy handles HTTP traffic by default, our STRICT mTLS policy meant that even if the protocol was understood, an explicit authorization was still required. Without an AuthorizationPolicy, all traffic is denied by default in a zero-trust environment.

We needed to create policies based on the principle of least privilege. The ingestion-service should be the only service allowed to call the embedding-service.

k8s/auth-policies.yaml

apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: allow-embedding-access
  namespace: rag-pipeline
spec:
  selector:
    matchLabels:
      app: embedding-service # This policy applies to the embedding-service pods
  action: ALLOW
  rules:
  - from:
    - source:
        # Only allow requests from pods with the 'ingestion-service' identity
        principals: ["cluster.local/ns/rag-pipeline/sa/ingestion-service-sa"]
    to:
    - operation:
        methods: ["POST"]
        paths: ["/embed"]
    - operation:
        methods: ["GET"]
        paths: ["/health"]
---
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: allow-chromadb-access
  namespace: rag-pipeline
spec:
  selector:
    matchLabels:
      app: chromadb # This policy applies to the ChromaDB pods
  action: ALLOW
  rules:
  - from:
    - source:
        # Only allow requests from the ingestion-service
        principals: ["cluster.local/ns/rag-pipeline/sa/ingestion-service-sa"]

This required creating specific ServiceAccounts for our deployments, another production best practice.

k8s/ingestion-service-deployment-with-sa.yaml (snippet)

# ... inside the deployment spec ...
template:
  spec:
    serviceAccountName: ingestion-service-sa
    containers:
    # ...
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: ingestion-service-sa
  namespace: rag-pipeline

After applying these policies, the full pipeline started working. A message produced to the Kafka topic was consumed, embedded, and indexed into ChromaDB. We verified this by checking the logs of all three services and querying ChromaDB directly. We also used istioctl to confirm traffic was being encrypted.

istioctl proxy-status showed all sidecars were SYNCED.
kubectl exec -it <ingestion-pod> -c istio-proxy -- openssl s_client -connect embedding-service:8000 showed the server certificate was being presented by Istio, proving the mTLS tunnel was active.

The Complete, Production-Ready Code

Here is the final state of the consumer code, now more resilient. The core logic hasn’t changed, but the surrounding structure for connection and configuration is more robust, which is essential for any real-world project.

ingestion-service/app/consumer_final.py

import json
import logging
import os
import time
from typing import Optional

import requests
import chromadb
from chromadb.api.models.Collection import Collection
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable

# --- Centralized Configuration ---
class AppConfig:
    KAFKA_BROKER: str = os.getenv("KAFKA_BROKER", "my-kafka-cluster-kafka-bootstrap.kafka:9092")
    KAFKA_TOPIC: str = os.getenv("KAFKA_TOPIC", "raw_documents")
    KAFKA_GROUP_ID: str = os.getenv("KAFKA_GROUP_ID", "ingestion-group-v1")
    EMBEDDING_SVC_URL: str = os.getenv("EMBEDDING_SVC_URL", "http://embedding-service.rag-pipeline.svc.cluster.local:8000/embed")
    CHROMADB_HOST: str = os.getenv("CHROMADB_HOST", "chromadb-service.rag-pipeline.svc.cluster.local")
    CHROMADB_PORT: int = int(os.getenv("CHROMADB_PORT", "8000"))
    COLLECTION_NAME: str = "document_embeddings"
    RETRY_COUNT: int = 5
    RETRY_DELAY_SECONDS: int = 10

# --- Standardized Logging ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# --- Service Connector Classes ---
class KafkaConnector:
    def __init__(self, config: AppConfig):
        self.config = config
        self.consumer: Optional[KafkaConsumer] = None

    def connect(self) -> KafkaConsumer:
        for i in range(self.config.RETRY_COUNT):
            try:
                logger.info(f"Connecting to Kafka at {self.config.KAFKA_BROKER} [Attempt {i+1}]")
                self.consumer = KafkaConsumer(
                    self.config.KAFKA_TOPIC,
                    bootstrap_servers=[self.config.KAFKA_BROKER],
                    auto_offset_reset='earliest',
                    enable_auto_commit=True,
                    group_id=self.config.KAFKA_GROUP_ID,
                    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                    consumer_timeout_ms=1000 # To allow periodic shutdown checks
                )
                logger.info("Successfully connected to Kafka.")
                return self.consumer
            except NoBrokersAvailable:
                logger.warning(f"Kafka broker not available. Retrying in {self.config.RETRY_DELAY_SECONDS}s...")
                time.sleep(self.config.RETRY_DELAY_SECONDS)
        logger.critical("Could not connect to Kafka after all retries. Shutting down.")
        raise SystemExit("Fatal: Kafka connection failed.")

class ChromaDBConnector:
    def __init__(self, config: AppConfig):
        self.config = config
        self.client: Optional[chromadb.HttpClient] = None

    def get_collection(self) -> Collection:
        logger.info(f"Connecting to ChromaDB at {self.config.CHROMADB_HOST}:{self.config.CHROMADB_PORT}")
        self.client = chromadb.HttpClient(host=self.config.CHROMADB_HOST, port=self.config.CHROMADB_PORT)
        # This operation is idempotent and crucial for startup.
        logger.info(f"Ensuring collection '{self.config.COLLECTION_NAME}' exists.")
        return self.client.get_or_create_collection(name=self.config.COLLECTION_NAME)

# --- Main Application Logic ---
class IngestionPipeline:
    def __init__(self, config: AppConfig):
        self.config = config
        self.http_session = requests.Session() # Use sessions for connection pooling

    def get_embedding(self, text: str, doc_id: str) -> Optional[list[float]]:
        try:
            response = self.http_session.post(
                self.config.EMBEDDING_SVC_URL,
                json={"text": text},
                timeout=15
            )
            response.raise_for_status()
            return response.json()['embedding']
        except requests.exceptions.RequestException as e:
            logger.error(f"HTTP error getting embedding for doc {doc_id}: {e}")
            return None

    def upsert_to_chroma(self, collection: Collection, doc_id: str, embedding: list[float]):
        try:
            collection.upsert(
                ids=[str(doc_id)],
                embeddings=[embedding],
                metadatas=[{"source": "kafka", "processed_at": int(time.time())}]
            )
            logger.info(f"Successfully upserted doc {doc_id} to ChromaDB.")
        except Exception as e:
            # This could be a transient network error or a data validation issue.
            logger.error(f"Failed to upsert doc {doc_id} to ChromaDB: {e}", exc_info=True)

    def run(self):
        kafka_connector = KafkaConnector(self.config)
        consumer = kafka_connector.connect()

        chroma_connector = ChromaDBConnector(self.config)
        collection = chroma_connector.get_collection()

        logger.info("Pipeline started. Consuming messages...")
        for message in consumer:
            message_value = message.value
            doc_id = message_value.get("id")
            content = message_value.get("content")

            if not doc_id or not content:
                logger.warning(f"Skipping malformed message at offset {message.offset}: {message_value}")
                continue
            
            logger.info(f"Processing message for doc ID {doc_id} from partition {message.partition}, offset {message.offset}")
            
            embedding = self.get_embedding(content, doc_id)
            if embedding:
                self.upsert_to_chroma(collection, doc_id, embedding)
            else:
                # A production system MUST have a DLQ strategy here.
                logger.error(f"Skipping doc {doc_id} due to embedding failure.")

def main():
    config = AppConfig()
    pipeline = IngestionPipeline(config)
    pipeline.run()

if __name__ == "__main__":
    main()

Lingering Issues and Future Work

This implementation establishes a secure and functional baseline, but it is far from a complete production system. The current design has several limitations that would need to be addressed in the next iteration. The error handling in the ingestion-service is naive; a message that repeatedly fails embedding or insertion will be retried indefinitely, blocking the partition. A robust dead-letter queue (DLQ) mechanism is not optional, it’s a requirement to prevent pipeline stalls. Furthermore, the embedding-service is a potential bottleneck; it should be scaled horizontally based on request latency or queue depth, and Istio’s load balancing capabilities should be fine-tuned. Finally, while we have security, we lack deep observability. Instrumenting the applications with OpenTelemetry to generate traces would allow us to correlate a slow ChromaDB upsert with a specific Kafka message, providing invaluable debugging context that Istio metrics alone cannot offer.


  TOC