The initial RAG pipeline was deceptive in its simplicity. A Python service would accept a query, generate an embedding, retrieve the top-K candidates from ChromaDB, and then a Python-based cross-encoder would rerank them for relevance. For a proof-of-concept, it worked. In a production environment with concurrent requests and the need for frequent, atomic updates to the underlying document collections, it failed completely. The two primary failure modes were a severe throughput bottleneck in the CPU-bound reranking step and race conditions during index updates, leading to inconsistent search results.
Throwing more Python workers at the problem yielded diminishing returns due to the Global Interpreter Lock (GIL) and did nothing to solve the state management issue. The system required a fundamental re-architecture, focusing on offloading the performance-critical component to a lower-level language and introducing a robust coordination layer for our distributed workers. Our stack of choice became a combination of Celery for task distribution, ChromaDB for vector storage, etcd for distributed consensus and state management, and Zig for a high-performance, memory-safe reranking module.
This isn’t a story about building a chatbot; it’s a post-mortem on building the distributed backend infrastructure required to make one reliable and scalable.
The Core Problem: Reranking Bottleneck and State Chaos
Our performance profiles consistently pointed to the reranking function. A typical implementation using a library like sentence-transformers
in Python is effective but computationally expensive. For a single query, it might be acceptable, but under a load of 50 requests per second, the P99 latency skyrocketed.
Simultaneously, our operations team struggled with updating the document collections in ChromaDB. The process involved creating a new collection, ingesting documents, and then switching the application to point to the new collection name. This switch was managed via a simple environment variable, requiring a coordinated restart of all Celery workers. This was slow, error-prone, and caused service degradation. A worker might still be processing a request using the old collection name while new workers were already using the new one. We needed an atomic, system-wide switch without service interruption.
The architectural decisions stemmed directly from these two pain points:
- Performance: Replace the Python reranker with a native library. We evaluated Rust, Go, and Zig. Zig was chosen for its unparalleled C ABI compatibility, which makes Foreign Function Interface (FFI) calls from Python via
ctypes
remarkably clean, and its explicit memory management, giving us full control over performance without a garbage collector’s unpredictability. - Coordination: Introduce a distributed consensus store. We needed atomic compare-and-swap operations, distributed locks, and the ability for workers to watch for configuration changes. etcd, with its foundation on the Raft algorithm, was the canonical choice for this role.
The High-Performance Reranker in Zig
The first step was to isolate and rewrite the bottleneck. We designed a simple Zig library that would accept a query and a list of documents, then return a re-ordered list of indices based on a mock relevance score. In a real-world project, this mock logic would be replaced with a lightweight model inference or a more complex scoring algorithm.
The critical part is the C ABI export. This allows the Zig compiler to produce a standard shared library (.so
on Linux, .dylib
on macOS) that any C-compatible language can load.
Here’s the directory structure for the Zig component:
reranker_zig/
├── build.zig
└── src/
└── main.zig
The core logic resides in src/main.zig
. We need to define data structures that can be safely passed across the FFI boundary. Raw pointers and lengths are the lingua franca of FFI.
// reranker_zig/src/main.zig
const std = @import("std");
// This struct will be passed from Python.
// `*const [*c]const u8` is a pointer to an array of C-style strings (null-terminated).
const InputDocs = struct {
query: [*c]const u8,
docs: [*c][*c]const u8,
num_docs: usize,
};
// A struct to hold a document's original index and its calculated score.
const ScoredIndex = struct {
index: u32,
score: f32,
};
// This is the primary function we will call from Python.
// It must be exported with a C calling convention.
export fn rerank_documents(input: *const InputDocs, output_indices: [*c]u32) void {
const allocator = std.heap.c_allocator;
// A common mistake is to forget that C strings are null-terminated.
// We must convert them to Zig slices with a known length.
const query_slice = std.mem.sliceTo(input.query, 0);
var scored_indices = std.ArrayList(ScoredIndex).init(allocator);
defer scored_indices.deinit();
// In a real implementation, this loop would contain complex logic.
// Here, we simulate a scoring function for demonstration.
// For example, score is higher if the document contains the query.
for (0..input.num_docs) |i| {
const doc_slice = std.mem.sliceTo(input.docs[i], 0);
var score: f32 = 0.0;
if (std.mem.contains(u8, doc_slice, query_slice)) {
score = 1.0;
} else {
score = @as(f32, @floatFromInt(i)) / @as(f32, @floatFromInt(input.num_docs));
}
scored_indices.append(.{ .index = @intCast(i), .score = score }) catch |err| {
// In production code, proper error handling is paramount.
// Here we panic, but returning an error code would be more robust.
std.log.err("Failed to allocate memory: {}", .{err});
return;
};
}
// Sort the documents by score in descending order.
std.sort.block(ScoredIndex, scored_indices.items, {}, struct {
fn lessThan(context: void, a: ScoredIndex, b: ScoredIndex) bool {
_ = context;
return a.score > b.score;
}
}.lessThan);
// Write the sorted indices to the output buffer provided by the Python caller.
for (scored_indices.items, 0..) |item, i| {
output_indices[i] = item.index;
}
}
The build script build.zig
is straightforward. It compiles the source into a dynamic shared library.
// reranker_zig/build.zig
const std = @import("std");
pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});
const lib = b.addSharedLibrary(.{
.name = "reranker",
.root_source_file = .{ .path = "src/main.zig" },
.target = target,
.optimize = optimize,
});
b.installArtifact(lib);
}
To build it, you run zig build
from the reranker_zig
directory. This produces zig-out/lib/libreranker.so
.
Integrating the Zig Library into Python
With the native library built, we need a Python wrapper to handle the FFI calls. The ctypes
library is standard for this. A pragmatic senior engineer knows that the biggest pitfall with FFI is memory management. Python’s garbage collector has no awareness of memory allocated by external libraries.
Our Python wrapper class will be responsible for:
- Loading the shared library.
- Converting Python data types (strings, lists) into C-compatible pointers.
- Defining the function signatures to match the Zig exports.
- Calling the Zig function.
- Interpreting the results.
# app/reranker.py
import ctypes
import os
from typing import List
class ZigReranker:
"""
A Python wrapper for the Zig-based reranking shared library.
This class handles the complexity of type conversion and FFI calls.
"""
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ZigReranker, cls).__new__(cls)
return cls._instance
def __init__(self, library_path: str = None):
if hasattr(self, '_initialized'):
return
if library_path is None:
# Assumes the library is in a known location relative to this file.
# In a real deployment, this path would be configured.
lib_dir = os.path.join(os.path.dirname(__file__), '..', 'reranker_zig/zig-out/lib')
library_path = os.path.join(lib_dir, 'libreranker.so')
if not os.path.exists(library_path):
raise FileNotFoundError(f"Shared library not found at {library_path}. Did you run 'zig build'?")
self.lib = ctypes.CDLL(library_path)
self._define_structures()
self._define_function_prototypes()
self._initialized = True
def _define_structures(self):
"""Define C-compatible structs that mirror the Zig structs."""
class InputDocs(ctypes.Structure):
_fields_ = [
("query", ctypes.c_char_p),
("docs", ctypes.POINTER(ctypes.c_char_p)),
("num_docs", ctypes.c_size_t),
]
self.InputDocs = InputDocs
def _define_function_prototypes(self):
"""Define the argument and return types for the Zig function."""
self.lib.rerank_documents.argtypes = [
ctypes.POINTER(self.InputDocs),
ctypes.POINTER(ctypes.c_uint32)
]
self.lib.rerank_documents.restype = None
def rerank(self, query: str, documents: List[str]) -> List[int]:
"""
Calls the external Zig function to rerank documents.
Handles all data conversion to and from C types.
"""
num_docs = len(documents)
if num_docs == 0:
return []
# Convert Python strings to bytes for C compatibility.
query_bytes = query.encode('utf-8')
doc_bytes_list = [d.encode('utf-8') for d in documents]
# Create a C-style array of char pointers.
c_docs_array = (ctypes.c_char_p * num_docs)(*doc_bytes_list)
# Create the input structure instance.
input_struct = self.InputDocs(
query=ctypes.c_char_p(query_bytes),
docs=c_docs_array,
num_docs=num_docs,
)
# Allocate memory for the output array where Zig will write the results.
output_array = (ctypes.c_uint32 * num_docs)()
# The core FFI call.
self.lib.rerank_documents(ctypes.byref(input_struct), output_array)
# Convert the C array result back to a Python list.
return list(output_array)
# Singleton instance for reuse across the application.
reranker_instance = ZigReranker()
This wrapper is crucial. It abstracts away the unsafe parts of FFI, providing a clean Python interface to our Celery tasks.
Celery Task Orchestration and State Management with etcd
Now we wire the reranker into our Celery application and use etcd to manage its state.
Our docker-compose.yml
sets up the required services:
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
etcd:
image: bitnami/etcd:3.5
ports:
- "2379:2379"
- "2380:2380"
environment:
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
chromadb:
image: chromadb/chroma:0.4.22
ports:
- "8000:8000"
worker:
build: .
volumes:
- .:/app
depends_on:
- redis
- etcd
- chromadb
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- ETCD_HOST=etcd
- CHROMA_HOST=chromadb
Our Celery application needs to connect to etcd on startup to fetch its configuration. We use the worker_process_init
signal for this. Each worker process will maintain its own connection to etcd and a “view” of the current state.
# app/celery_app.py
import os
import etcd3
from celery import Celery
from celery.signals import worker_process_init
from .reranker import reranker_instance # Our Zig wrapper
# Application state, populated at worker startup.
# This is a global dictionary, but it's local to each worker process.
APP_STATE = {
'etcd_client': None,
'active_collection_name': 'default-collection-v1' # Default fallback
}
celery_app = Celery(
'rag_tasks',
broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
include=['app.tasks']
)
@worker_process_init.connect
def init_worker(**kwargs):
"""
This function is called when a Celery worker process starts.
We use it to establish a connection to etcd and fetch initial config.
"""
print("Initializing worker process...")
try:
host = os.environ.get('ETCD_HOST', 'localhost')
client = etcd3.client(host=host, port=2379)
# Verify connection
client.status()
APP_STATE['etcd_client'] = client
print("Successfully connected to etcd.")
# Fetch the active collection name from etcd
# The key `/config/rag/active_collection` stores the name of the ChromaDB collection to use.
collection_name_bytes, _ = client.get('/config/rag/active_collection')
if collection_name_bytes:
APP_STATE['active_collection_name'] = collection_name_bytes.decode('utf-8')
print(f"Fetched active collection name: {APP_STATE['active_collection_name']}")
else:
print(f"Key '/config/rag/active_collection' not found in etcd. Using default: {APP_STATE['active_collection_name']}")
# We should probably seed this value in a real system.
client.put('/config/rag/active_collection', APP_STATE['active_collection_name'])
except Exception as e:
print(f"FATAL: Could not connect to etcd or fetch initial config: {e}")
# In a real system, you might want the worker to exit if it can't get its config.
# For this example, it will proceed with the default.
The main processing task now uses both the reranker and the state from etcd.
# app/tasks.py
import chromadb
from .celery_app import celery_app, APP_STATE
from .reranker import reranker_instance
import logging
# In a real app, the client would be managed more robustly.
chroma_client = chromadb.HttpClient(host='chromadb', port=8000)
logger = logging.getLogger(__name__)
@celery_app.task(name='tasks.process_query')
def process_query(query: str, top_k: int = 10):
"""
The main RAG processing task.
1. Fetches active collection name from its process-local state.
2. Queries ChromaDB.
3. Reranks results using the Zig module.
"""
collection_name = APP_STATE.get('active_collection_name')
if not collection_name:
raise ValueError("Active collection name is not configured.")
try:
collection = chroma_client.get_collection(name=collection_name)
# 1. Retrieve initial candidates from ChromaDB
results = collection.query(
query_texts=[query],
n_results=top_k
)
docs = results['documents'][0]
if not docs:
return []
# 2. Rerank using the high-performance Zig module
# The reranker returns indices into the original `docs` list.
reranked_indices = reranker_instance.rerank(query, docs)
# 3. Reorder the documents based on the new ranking
final_docs = [docs[i] for i in reranked_indices]
return final_docs
except Exception as e:
logger.error(f"Error processing query '{query}' for collection '{collection_name}': {e}", exc_info=True)
# Celery's default behavior is to retry. We can customize this.
raise
Distributed Locking for Safe Index Updates
The second problem was safe index updates. We can create an administrative task that uses an etcd distributed lock to ensure only one worker at a time can perform a hot-swap of the active collection. An etcd lease is key here; if the worker holding the lock crashes, the lease expires, and the lock is automatically released, preventing a permanent deadlock.
sequenceDiagram participant Admin participant Celery participant WorkerA participant etcd participant WorkerB Admin->>Celery: Enqueue `switch_collection` task Celery->>WorkerA: Assign task WorkerA->>etcd: Attempt to acquire lock `/locks/collection_swap` with 60s lease etcd-->>WorkerA: Lock acquired (lease granted) Note right of WorkerA: Worker B attempts same task, but lock fails Celery->>WorkerB: Assign another task WorkerB->>etcd: Attempt to acquire lock etcd-->>WorkerB: Lock unavailable WorkerA->>etcd: Atomically update key `/config/rag/active_collection` to `new-collection-v2` etcd-->>WorkerA: Update successful Note over WorkerA,WorkerB: All workers will see this change on their next task or via a watch. WorkerA->>etcd: Release lock etcd-->>WorkerA: Lock released
Here’s the implementation of that administrative task:
# app/tasks.py (continued)
import time
@celery_app.task(name='tasks.switch_active_collection')
def switch_active_collection(new_collection_name: str, lock_timeout: int = 60):
"""
Atomically switches the active ChromaDB collection name in etcd.
Uses a distributed lock to prevent race conditions.
"""
etcd_client = APP_STATE.get('etcd_client')
if not etcd_client:
raise RuntimeError("etcd client not initialized in this worker.")
lock_name = '/locks/rag/collection_swap'
# The `etcd3.Lock` object uses a lease internally.
lock = etcd_client.lock(lock_name, ttl=lock_timeout)
logger.info(f"Attempting to acquire lock '{lock_name}' to switch collection to '{new_collection_name}'")
# Non-blocking acquire attempt.
if lock.acquire(blocking=False):
try:
logger.info("Lock acquired. Proceeding with collection switch.")
# Read the current value to prevent unnecessary writes.
current_collection_bytes, _ = etcd_client.get('/config/rag/active_collection')
current_collection = current_collection_bytes.decode('utf-8') if current_collection_bytes else ''
if current_collection == new_collection_name:
logger.warning(f"Collection is already set to '{new_collection_name}'. No action taken.")
return {"status": "noop", "collection": new_collection_name}
# This is the atomic update operation.
etcd_client.put('/config/rag/active_collection', new_collection_name)
# In a real system, we'd wait a moment to allow propagation
# and potentially notify other systems.
time.sleep(2)
logger.info(f"Successfully switched active collection to '{new_collection_name}'")
return {"status": "success", "collection": new_collection_name}
finally:
# Crucial to release the lock.
lock.release()
logger.info("Lock released.")
else:
logger.warning(f"Could not acquire lock '{lock_name}'. Another process is likely performing an update.")
# We can choose to retry this task later.
raise Exception("Failed to acquire distributed lock for collection switch.")
With this in place, we can trigger a collection swap with a simple Python script without downtime or inconsistent states. Workers don’t need to be restarted; they will pick up the new collection name from etcd. For even faster propagation, workers could implement a watch on the /config/rag/active_collection
key in etcd to update their internal state in real-time, but for many use cases, fetching it at the start of a task is sufficient.
The integration of these technologies solves the initial problems effectively. The Zig module moved the reranking latency from being the dominant factor to a negligible one. etcd provided the coordination backbone that was missing, transforming our fleet of dumb workers into a cohesive, state-aware system.
The FFI boundary between Python and Zig, while performant, is not free. There is a serialization cost to marshalling data into C-compatible structures. For our workload, this cost was minor compared to the massive gains from the native computation. However, for a system passing gigabytes of data per second across this boundary, a different architecture involving shared memory or rewriting the entire worker in a compiled language might be necessary. Furthermore, the current error handling is rudimentary; a production-grade FFI interface should pass error codes back from Zig to Python to handle allocation failures or other runtime issues gracefully. The system’s resilience is now tightly coupled to etcd’s availability, adding a critical piece of infrastructure that must be monitored and maintained.