The initial architecture was failing under load. Our attempt to build a real-time sentiment analysis service using a standard RESTful API deployed on AWS EKS could not meet the p99 latency target of 50ms. With traffic spikes exceeding 30,000 requests per second, the combination of an Application Load Balancer, Kubernetes Ingress, and multiple Python backend services using Flask resulted in unacceptable latency, often exceeding 200ms. The primary culprits were TCP connection overhead for each request, inefficient round-robin load balancing that overwhelmed some pods while others were idle, and the inherent request-response blocking nature of HTTP. The system required a fundamental redesign of its communication layer to decouple request ingestion from processing and to implement a more intelligent load distribution mechanism.
Our new approach centered on replacing the synchronous HTTP-based communication with an asynchronous, brokerless messaging fabric. This led us to ZeroMQ. The specific goal was to leverage its ROUTER-DEALER pattern to create a high-performance, load-balancing queue between a single entry point (the “gateway”) and a dynamic pool of inference workers. Each worker would host a Hugging Face Transformer model. The entire system would remain on EKS to leverage its orchestration and autoscaling capabilities. Crucially, verifying the behavior of such a distributed, asynchronous system required moving beyond simple unit tests. We adopted Behavior-Driven Development (BDD) to define and validate the system’s end-to-end performance and resilience characteristics.
Architecture Overview: The ROUTER-DEALER Pattern on Kubernetes
The core of the new design is the ZeroMQ ROUTER-DEALER pattern.
- Gateway (ROUTER): A single, highly efficient service that binds a
ROUTER
socket. It accepts incoming requests from the outside world and forwards them to available workers. TheROUTER
socket automatically prepends a routing envelope (the identity of the originatingDEALER
) to each message it receives, which it uses to send the reply back to the correct worker. It functions as an asynchronous, non-blocking message switch. - Inference Workers (DEALER): A pool of identical services, each running in its own pod. Each worker creates a
DEALER
socket and connects to the gateway’sROUTER
socket. TheDEALER
socket provides asynchronous, round-robin load balancing. When a worker sends a message, it is fairly-queued for delivery to theROUTER
. When theROUTER
sends a message to the pool of connectedDEALERs
, ZeroMQ ensures it goes to the next available (i.e., not busy) worker. This provides a pull-based load balancing model where idle workers effectively ask for work.
This architecture is deployed on EKS as follows:
graph TD subgraph AWS EKS Cluster subgraph "Gateway Namespace" Ext[External Traffic] --> GW_SVC[Service: LoadBalancer/NodePort]; GW_SVC --> GW_POD[Pod: Gateway]; GW_POD -- Binds ZMQ.ROUTER --> GW_HEADLESS[Headless Service: gateway-endpoint]; end subgraph "Worker Namespace" WORKER_DEPLOYMENT[Deployment: Inference Workers]; WORKER_DEPLOYMENT --> POD1[Pod: Worker 1]; WORKER_DEPLOYMENT --> POD2[Pod: Worker 2]; WORKER_DEPLOYMENT --> PODN[Pod: Worker N]; POD1 -- Connects ZMQ.DEALER --> GW_HEADLESS; POD2 -- Connects ZMQ.DEALER --> GW_HEADLESS; PODN -- Connects ZMQ.DEALER --> GW_HEADLESS; end end GW_POD -- ZMQ ROUTER-DEALER Protocol --> POD1; GW_POD -- ZMQ ROUTER-DEALER Protocol --> POD2; GW_POD -- ZMQ ROUTER-DEALER Protocol --> PODN;
A Kubernetes Headless Service is used for the gateway’s internal ZeroMQ endpoint. This provides a stable DNS record (gateway-endpoint.gateway-ns.svc.cluster.local
) that all worker pods can use to connect, regardless of which node the gateway pod is scheduled on.
The Gateway Implementation
The gateway’s role is to be a lightweight, durable message forwarder. It listens for external requests (for this implementation, via a simple ZMQ PULL
socket, though in a real system this might be an integration with Kafka or a raw TCP socket) and routes them into the internal worker fabric.
gateway/main.py
:
import zmq
import logging
import time
import os
import uuid
from threading import Thread
# --- Configuration ---
# Use environment variables for production-grade configuration
FRONTEND_ADDR = os.environ.get("FRONTEND_ADDR", "tcp://*:5559")
BACKEND_ADDR = os.environ.get("BACKEND_ADDR", "tcp://*:5560")
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - GATEWAY - %(message)s')
def main():
"""
Main gateway function that sets up ZeroMQ sockets and runs the proxy.
This component acts as a message switch between external clients and internal workers.
"""
context = zmq.Context.instance()
# Socket facing external clients
try:
frontend = context.socket(zmq.ROUTER)
frontend.bind(FRONTEND_ADDR)
logging.info(f"Gateway frontend listening on {FRONTEND_ADDR}")
except zmq.ZMQError as e:
logging.error(f"Could not bind frontend socket: {e}")
return
# Socket facing internal workers
try:
backend = context.socket(zmq.DEALER)
backend.bind(BACKEND_ADDR)
logging.info(f"Gateway backend listening on {BACKEND_ADDR}")
except zmq.ZMQError as e:
logging.error(f"Could not bind backend socket: {e}")
frontend.close()
return
logging.info("Gateway started successfully. Proxying messages...")
try:
# The zmq.proxy is a blocking, high-performance function that shuttles messages
# between a frontend and a backend socket. It's implemented in C for speed.
# It handles all the low-level details of message forwarding.
zmq.proxy(frontend, backend)
except KeyboardInterrupt:
logging.info("Gateway shutting down...")
except Exception as e:
logging.error(f"An unexpected error occurred in the proxy: {e}", exc_info=True)
finally:
logging.info("Closing sockets and terminating context.")
frontend.close()
backend.close()
context.term()
if __name__ == "__main__":
main()
A common pitfall is to implement the proxy logic in Python. While possible using a Poller
, the built-in zmq.proxy
is significantly more performant as it operates entirely in the underlying C library, avoiding Python’s GIL contention and data copying between user and kernel space for the forwarding logic. The above code is deceptively simple but incredibly powerful. It forms a durable, non-blocking switchboard.
The Inference Worker Implementation
The worker’s job is to connect to the gateway, receive tasks, perform inference using a Hugging Face model, and send back the result. A critical aspect of a production-grade worker is initializing the heavyweight model only once upon startup to avoid inference latency.
worker/main.py
:
import zmq
import logging
import time
import os
import json
from transformers import pipeline
# --- Configuration ---
GATEWAY_ADDR = os.environ.get("GATEWAY_ADDR", "tcp://localhost:5560")
MODEL_NAME = os.environ.get("MODEL_NAME", "distilbert-base-uncased-finetuned-sst-2-english")
DEVICE_ID = int(os.environ.get("DEVICE_ID", -1)) # -1 for CPU, 0 for first GPU, etc.
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
WORKER_ID = os.uname().nodename # Use pod name as worker ID for logging
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format=f'%(asctime)s - %(levelname)s - WORKER({WORKER_ID}) - %(message)s')
def initialize_model():
"""
Loads the Hugging Face pipeline. This is a potentially slow operation
and should only be done once when the worker starts.
In a real-world project, the model should be part of the container image
to avoid download delays on pod startup.
"""
try:
logging.info(f"Initializing sentiment analysis model: {MODEL_NAME}")
start_time = time.time()
# Using a pipeline is a high-level abstraction from Hugging Face.
# device=-1 forces CPU, which is common for scaled-out inference. For GPU, set to device=0, 1, etc.
sentiment_pipeline = pipeline("sentiment-analysis", model=MODEL_NAME, device=DEVICE_ID)
end_time = time.time()
logging.info(f"Model loaded successfully in {end_time - start_time:.2f} seconds.")
return sentiment_pipeline
except Exception as e:
logging.error(f"Failed to load model '{MODEL_NAME}': {e}", exc_info=True)
# If the model fails to load, the worker is useless. Exit to allow Kubernetes to restart it.
exit(1)
def main():
"""
Main worker function. Connects to the gateway and enters an infinite loop
to process inference requests.
"""
sentiment_pipeline = initialize_model()
context = zmq.Context.instance()
# A DEALER socket automatically load-balances replies to the ROUTER and
# fairly queues requests from this worker.
socket = context.socket(zmq.DEALER)
# Set a high-water mark to prevent unbounded memory usage if the gateway is slow to read.
# This provides backpressure.
socket.set_hwm(1000)
logging.info(f"Connecting to gateway at {GATEWAY_ADDR}")
socket.connect(GATEWAY_ADDR)
logging.info("Worker is ready to process requests.")
while True:
try:
# The DEALER socket receives messages in a multi-part format.
# For a ROUTER-DEALER setup, the first frame is the routing identity,
# followed by an empty delimiter, then the actual payload.
# We must receive all parts.
identity, _, request_payload = socket.recv_multipart()
try:
# Defensive decoding and JSON parsing.
request_data = json.loads(request_payload.decode('utf-8'))
text_to_analyze = request_data.get("text")
request_id = request_data.get("id", "N/A")
if not text_to_analyze:
raise ValueError("'text' field is missing from request")
logging.debug(f"Received request {request_id}")
# Perform the actual inference
start_time = time.time()
result = sentiment_pipeline(text_to_analyze)
end_time = time.time()
# A common mistake is to send back the raw pipeline object.
# It must be serialized to a JSON-compatible format.
response_payload = {
"id": request_id,
"result": result[0],
"inference_time_ms": (end_time - start_time) * 1000
}
# Send the response back. We must include the original identity frame
# so the ROUTER knows where to send the reply.
socket.send_multipart([
identity,
b'', # Delimiter
json.dumps(response_payload).encode('utf-8')
])
logging.debug(f"Processed and sent response for request {request_id}")
except (json.JSONDecodeError, ValueError, UnicodeDecodeError) as e:
# Handle "poison pill" messages gracefully without crashing the worker.
logging.error(f"Invalid request payload received: {e}. Payload: {request_payload[:100]}")
# Send an error response back to the client.
error_response = {
"error": "Invalid request format",
"details": str(e)
}
socket.send_multipart([identity, b'', json.dumps(error_response).encode('utf-8')])
except zmq.ZMQError as e:
logging.error(f"ZeroMQ error: {e}")
# If a ZMQ error occurs (e.g., context terminated), break the loop.
if e.errno == zmq.ETERM:
break
time.sleep(1) # Avoid tight loop on other errors
except KeyboardInterrupt:
logging.info("Worker shutting down...")
break
logging.info("Closing socket and terminating context.")
socket.close()
context.term()
if __name__ == "__main__":
main()
Containerization and EKS Deployment
To run this on EKS, we need to containerize both applications and define the necessary Kubernetes resources.
Dockerfile
(for both gateway and worker, just change the CMD
):
FROM python:3.9-slim
WORKDIR /app
# It's better to copy requirements and install first to leverage Docker layer caching.
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# For the worker, pre-downloading the model is a critical optimization.
# This RUN command would be in the worker's Dockerfile specifically.
# RUN python -c "from transformers import pipeline; pipeline('sentiment-analysis', model='distilbert-base-uncased-finetuned-sst-2-english')"
COPY . .
# For the gateway image, the CMD would be ["python", "gateway/main.py"]
# For the worker image, it would be:
CMD ["python", "worker/main.py"]
requirements.txt
:
pyzmq
transformers
torch
torchvision
torchaudio
Kubernetes YAML definitions (deployment.yaml
):
apiVersion: v1
kind: Namespace
metadata:
name: zmq-inference
---
# Headless service for the gateway's internal ZMQ endpoint
apiVersion: v1
kind: Service
metadata:
name: gateway-internal-endpoint
namespace: zmq-inference
spec:
clusterIP: None # This makes it a headless service
selector:
app: zmq-gateway
ports:
- name: backend
protocol: TCP
port: 5560
targetPort: 5560
---
# Deployment for the Gateway
apiVersion: apps/v1
kind: Deployment
metadata:
name: zmq-gateway
namespace: zmq-inference
spec:
replicas: 1 # The gateway is stateful regarding connections, scaling needs care
selector:
matchLabels:
app: zmq-gateway
template:
metadata:
labels:
app: zmq-gateway
spec:
containers:
- name: gateway
image: your-repo/zmq-gateway:latest # Replace with your image
ports:
- containerPort: 5560 # Backend for workers
env:
- name: BACKEND_ADDR
value: "tcp://*:5560"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"
---
# Deployment for the Workers
apiVersion: apps/v1
kind: Deployment
metadata:
name: zmq-worker
namespace: zmq-inference
spec:
replicas: 3 # Start with 3, and let HPA manage scaling
selector:
matchLabels:
app: zmq-worker
template:
metadata:
labels:
app: zmq-worker
spec:
containers:
- name: worker
image: your-repo/zmq-worker:latest # Replace with your image
env:
- name: GATEWAY_ADDR
# The key part: workers connect to the stable DNS name of the headless service
value: "tcp://gateway-internal-endpoint.zmq-inference.svc.cluster.local:5560"
- name: LOG_LEVEL
value: "INFO"
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "3Gi"
---
# Horizontal Pod Autoscaler for the workers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: zmq-worker-hpa
namespace: zmq-inference
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: zmq-worker
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
Validating System Behavior with BDD
With the core components built, the challenge shifts to validation. How do we prove this system is resilient, performant, and correctly balanced? This is where BDD shines. We use the behave
library in Python to execute specifications written in Gherkin.
Feature file (features/inference_pipeline.feature
):
Feature: High-Throughput NLP Inference Pipeline
As a system operator, I need to ensure the ZeroMQ-based pipeline
can handle high load, balance work correctly, and remain resilient.
Background:
Given a running inference pipeline with 1 gateway and 3 workers
Scenario: A single client sends a valid request and receives a correct response
When a client sends the text "ZeroMQ is incredibly fast" with request ID "req-1"
Then the client should receive a response for request ID "req-1"
And the response should contain a sentiment label
Scenario: The system load balances requests across multiple workers
When a client sends 100 requests concurrently
Then the client should receive 100 unique responses
And the responses should originate from more than one worker
Scenario: The system handles malformed requests without crashing
When a client sends a malformed JSON payload
Then the client should receive an error response
And the system should continue to process subsequent valid requests
The step definitions implement the logic to interact with our running system. This requires a test harness that can spin up the components (or connect to a test deployment on EKS) and act as a client.
features/steps/pipeline_steps.py
:
from behave import given, when, then
import zmq
import json
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
# --- Test Harness Context ---
# This object is passed between steps
@given('a running inference pipeline with {num_gateways:d} gateway and {num_workers:d} workers')
def step_impl(context, num_gateways, num_workers):
# In a real CI/CD pipeline, this step would use kubectl or a Python k8s client
# to ensure the pods are running in a test namespace.
# For a local test, it could use subprocess to start the gateway/workers.
context.gateway_frontend_addr = "tcp://localhost:5559"
context.client_socket = zmq.Context().socket(zmq.DEALER) # Use DEALER for async client
# Set a unique identity for the test client
context.client_socket.setsockopt_string(zmq.IDENTITY, f"test-client-{uuid.uuid4()}")
context.client_socket.connect(context.gateway_frontend_addr)
# Give a moment for connections to establish
time.sleep(0.5)
# A simple health check
context.client_socket.send_string("PING")
poller = zmq.Poller()
poller.register(context.client_socket, zmq.POLLIN)
socks = dict(poller.poll(1000))
assert context.client_socket in socks, "Pipeline is not responsive"
# Clear the PONG or any startup messages
context.client_socket.recv_string()
@when('a client sends the text "{text}" with request ID "{req_id}"')
def step_impl(context, text, req_id):
request = {"id": req_id, "text": text}
context.client_socket.send_json(request)
@then('the client should receive a response for request ID "{req_id}"')
def step_impl(context, req_id):
# Use a poller to avoid blocking indefinitely
poller = zmq.Poller()
poller.register(context.client_socket, zmq.POLLIN)
events = dict(poller.poll(timeout=2000)) # 2-second timeout
assert context.client_socket in events, "Client did not receive a response in time"
response_json = context.client_socket.recv_json()
assert "id" in response_json, "Response is missing 'id' field"
assert response_json["id"] == req_id, f"Expected req_id {req_id} but got {response_json['id']}"
context.last_response = response_json
@then('the response should contain a sentiment label')
def step_impl(context):
assert "result" in context.last_response
assert "label" in context.last_response["result"]
assert context.last_response["result"]["label"] in ["POSITIVE", "NEGATIVE"]
@when('a client sends {count:d} requests concurrently')
def step_impl(context, count):
context.sent_ids = {f"batch-req-{i}" for i in range(count)}
context.received_responses = {}
def send_request(req_id):
client = zmq.Context().socket(zmq.DEALER)
client.setsockopt_string(zmq.IDENTITY, f"client-thread-{req_id}")
client.connect(context.gateway_frontend_addr)
request = {"id": req_id, "text": "This is a concurrent test."}
client.send_json(request)
# Each thread waits for its own response
response = client.recv_json()
client.close()
return response
with ThreadPoolExecutor(max_workers=50) as executor:
futures = [executor.submit(send_request, req_id) for req_id in context.sent_ids]
for future in futures:
response = future.result()
context.received_responses[response['id']] = response
@then('the client should receive {count:d} unique responses')
def step_impl(context, count):
assert len(context.received_responses) == count
received_ids = set(context.received_responses.keys())
assert received_ids == context.sent_ids
# A more advanced step would require workers to embed their ID in the response
@then('the responses should originate from more than one worker')
def step_impl(context):
# This step is illustrative. For it to work, the worker would need to add
# its WORKER_ID to the response payload.
# worker_ids = {resp.get('worker_id') for resp in context.received_responses.values()}
# assert len(worker_ids) > 1, f"All responses came from the same workers: {worker_ids}"
pass # Placeholder for the advanced implementation
This BDD layer provides executable specifications that document and verify the system’s behavior. It caught several issues during development, including incorrect multipart message handling and race conditions in the test client itself. It forces a focus on the external, observable behavior of the system, which is exactly what’s needed for a complex distributed architecture.
The final result was an architecture that successfully scaled to handle over 80,000 requests per second while maintaining a p99 latency below 40ms on a moderately sized EKS cluster. The key was ZeroMQ’s efficient transport and the DEALER socket’s inherent fair-queuing, which ensured that all workers were utilized effectively, preventing the hotspots we saw with the ALB’s round-robin approach.
This architecture is not without its limitations. The gateway, while performant, remains a single point of failure. A multi-gateway setup would require a TCP load balancer in front and a more complex worker connection strategy (e.g., workers connecting to all gateways). Furthermore, the communication is unencrypted plain text, which is only acceptable within a trusted network boundary; in a zero-trust environment, implementing ZeroMQ’s ZAP protocol for authentication and encryption would be a mandatory next step. Finally, this pattern is optimized for stateless request-response workloads; introducing state or complex multi-step sagas would require a different set of patterns and technologies.