The primary bottleneck in our real-time personalization service was a synchronous call to a Keras model. The model inference, running on a separate Python service, consistently took between 250ms and 600ms. This latency was directly exposed to the client through our Kong API Gateway, violating our P99 latency SLO of 200ms. Simply increasing the number of model service instances provided diminishing returns and failed to address the core problem: synchronous blocking I/O for a long-running task at the edge. The system couldn’t handle traffic spikes, leading to request timeouts and a degraded user experience.
Our initial concept was to break this synchronous coupling. The gateway should not wait for the inference to complete. Instead, it should accept the request, guarantee its eventual processing, and immediately return a success acknowledgment to the client. This pointed directly to an event-driven architecture, where the gateway acts as a fast ingestion point, publishing inference jobs to a durable message queue. A separate pool of backend workers could then consume these jobs at their own pace.
This architectural shift presented its own set of technology choices and implementation challenges. We were already committed to Kong as our API Gateway. Replacing it was not an option. The natural path forward was to leverage its extensibility through custom plugins. For the message bus, Apache Kafka was the obvious choice due to its high-throughput capabilities and data durability, which are critical for preventing data loss during traffic surges. Our backend services are predominantly written in Ruby, so the consumer fleet would be built using the ruby-kafka
library. Redis was brought in to serve two distinct, critical roles: first, as a high-performance rate-limiter at the Kong plugin level to protect the entire downstream system from abuse, and second, as an inference cache and state store for the Ruby consumers to avoid redundant model computations. Keras, via a simple TF-Serving-like HTTP endpoint, would remain as the core ML component, but now isolated from the client-facing request path.
The architecture crystallized into a flow:
- A client sends a POST request to Kong.
- A custom Lua plugin in Kong intercepts the request in the
access
phase. - The plugin checks a rate limit against Redis.
- If the limit is not exceeded, it serializes the request body and publishes it as a message to a Kafka topic.
- Upon successful publish acknowledgment from Kafka, the plugin immediately terminates the request lifecycle and sends an
HTTP 202 Accepted
response back to the client. - A pool of Ruby consumers, running as a separate service, listens to the Kafka topic.
- Upon receiving a message, a Ruby worker first checks Redis for a cached result for the given input.
- If a cache miss occurs, it calls the Keras model service, gets the inference result, and stores it in Redis before completing its work.
- The Kafka consumer commits the message offset.
This design decouples the ingestion layer from the processing layer, turning a slow, synchronous endpoint into a fast, asynchronous one.
sequenceDiagram participant Client participant Kong Gateway participant Custom Plugin (Lua) participant Redis participant Kafka participant Ruby Consumer participant Keras Service Client->>+Kong Gateway: POST /inference Kong Gateway->>+Custom Plugin (Lua): access phase Custom Plugin (Lua)->>+Redis: INCR rate_limit_key Redis-->>-Custom Plugin (Lua): (integer) count alt Rate Limit Not Exceeded Custom Plugin (Lua)->>+Kafka: Produce(request_payload) Kafka-->>-Custom Plugin (Lua): Ack Custom Plugin (Lua)->>+Kong Gateway: kong.response.exit(2022) Kong Gateway-->>-Client: HTTP 202 Accepted else Rate Limit Exceeded Custom Plugin (Lua)->>+Kong Gateway: kong.response.exit(429) Kong Gateway-->>-Client: HTTP 429 Too Many Requests end deactivate Kong Gateway loop Async Processing Ruby Consumer->>+Kafka: Consume(request_payload) Ruby Consumer->>+Redis: GET inference_cache_key alt Cache Miss Redis-->>-Ruby Consumer: nil Ruby Consumer->>+Keras Service: POST /predict Keras Service-->>-Ruby Consumer: { "result": ... } Ruby Consumer->>+Redis: SETEX inference_cache_key 3600 result else Cache Hit Redis-->>-Ruby Consumer: cached_result end Ruby Consumer->>Kafka: Commit Offset end
Part 1: The Custom Kong Plugin
The core of this architecture lies within Kong. A custom plugin, written in Lua, is required to implement this non-standard request handling logic. The plugin needs dependencies for interacting with Kafka and Redis, which in the OpenResty environment are typically lua-resty-kafka
and lua-resty-redis
.
The plugin’s directory structure is standard for Kong:
kong/plugins/async-inference/
├── handler.lua
└── schema.lua
The schema.lua
file defines the configuration parameters for our plugin, such as Kafka broker addresses, the target topic, and Redis connection details. This makes the plugin reusable and configurable via Kong’s Admin API.
schema.lua
-- kong/plugins/async-inference/schema.lua
local typedefs = require "kong.db.schema.typedefs"
return {
name = "async-inference",
fields = {
{ consumer = typedefs.no_consumer },
{ route = typedefs.no_route },
{ service = typedefs.no_service },
{
config = {
type = "record",
fields = {
{ kafka_brokers = {
type = "array",
required = true,
default = { { host = "kafka", port = 9092 } },
elements = {
type = "record",
fields = {
{ host = { type = "string", required = true } },
{ port = { type = "integer", required = true, default = 9092 } },
}
}
}
},
{ kafka_topic = { type = "string", required = true, default = "inference-jobs" } },
{ redis_host = { type = "string", required = true, default = "redis" } },
{ redis_port = { type = "integer", required = true, default = 6379 } },
{ redis_timeout = { type = "integer", default = 2000 } }, -- ms
{ rate_limit_per_minute = { type = "integer", default = 1000 } },
},
},
},
},
}
The main logic resides in handler.lua
. We hook into the access
phase of the Kong request lifecycle. This phase runs before Kong proxies the request to an upstream service, giving us full control.
handler.lua
-- kong/plugins/async-inference/handler.lua
local cjson = require "cjson"
local redis = require "resty.redis"
local producer = require "resty.kafka.producer"
local AsyncInferenceHandler = {
PRIORITY = 1000,
VERSION = "0.1.0",
}
-- Helper function to connect to Redis
local function connect_redis(conf)
local red = redis:new()
red:set_timeout(conf.redis_timeout)
local ok, err = red:connect(conf.redis_host, conf.redis_port)
if not ok then
kong.log.err("failed to connect to redis: ", err)
return nil, err
end
return red, nil
end
-- Check rate limit against Redis
local function check_rate_limit(conf)
local red, err = connect_redis(conf)
if not red then
-- Fail open if Redis is down. A critical production decision.
-- Alternatively, could fail closed by returning an error.
kong.log.err("cannot connect to redis for rate limiting: ", err)
return true
end
local client_ip = kong.client.get_ip()
local current_minute = math.floor(ngx.now() / 60)
local key = "rate_limit:async_inference:" .. client_ip .. ":" .. current_minute
local res, err = red:incr(key)
if err then
kong.log.err("failed to incr rate limit key: ", err)
return true -- Fail open
end
if res == 1 then
-- This is the first request in this minute, set expiry
red:expire(key, 60)
end
-- Close the Redis connection
red:set_keepalive(0, 100)
if res > conf.rate_limit_per_minute then
return false
end
return true
end
function AsyncInferenceHandler:access(conf)
-- Step 1: Enforce rate limiting
local limit_ok = check_rate_limit(conf)
if not limit_ok then
return kong.response.exit(429, { message = "Rate limit exceeded" })
end
-- Step 2: Read the request body
-- This is a blocking operation, but necessary.
local body, err = kong.request.get_raw_body()
if err then
kong.log.err("failed to get request body: ", err)
return kong.response.exit(500, { message = "Internal Server Error: Cannot read body" })
end
if not body or body == "" then
return kong.response.exit(400, { message = "Request body cannot be empty" })
end
-- We could add validation here (e.g. is it valid JSON?)
-- For now, we pass it through as a raw string.
-- Step 3: Produce to Kafka
local p, err = producer:new(conf.kafka_brokers, { producer_type = "async" })
if not p then
kong.log.err("failed to create kafka producer: ", err)
return kong.response.exit(500, { message = "Internal Server Error: Kafka producer failure" })
end
-- The key could be something meaningful for partitioning, like a user ID from a header
local message_key = kong.request.get_header("X-User-ID") or ngx.now()
local ok, err = p:send(conf.kafka_topic, message_key, body)
if not ok then
kong.log.err("failed to send message to kafka: ", err)
return kong.response.exit(502, { message = "Bad Gateway: Could not queue request" })
end
-- Step 4: Short-circuit the request and return 202 Accepted
-- This is the crucial step that prevents Kong from proxying upstream.
local job_id = ngx.req.get_headers()["X-Request-ID"] or ngx.md5(body .. ngx.now())
return kong.response.exit(202, { message = "Request accepted for processing", job_id = job_id })
end
return AsyncInferenceHandler
A key decision in this code is to “fail open” if Redis is unavailable. In a real-world project, this would depend on business requirements. Failing closed (denying the request) protects the system, while failing open prioritizes availability at the risk of overwhelming the backend.
Part 2: The Ruby Kafka Consumer
The consumer is a long-running Ruby process responsible for the actual work. It uses the ruby-kafka
gem for robust consumer group management and redis-rb
for caching.
Gemfile
source 'https://rubygems.org'
gem 'kafka-rb', '~> 2.7'
gem 'redis', '~> 4.6'
gem 'httparty', '~> 0.20'
gem 'oj', '~> 3.13' # A faster JSON parser
gem 'logging', '~> 2.3'
The consumer logic needs to be idempotent and resilient. If the process crashes after processing a message but before committing the offset, Kafka will re-deliver the message. Our use of a Redis cache helps make the processing idempotent; re-processing a message will result in a cache hit and avoid calling the Keras model again.
consumer.rb
# consumer.rb
require 'kafka'
require 'redis'
require 'httparty'
require 'oj'
require 'logging'
require 'digest'
# --- Configuration ---
KAFKA_BROKERS = ENV.fetch('KAFKA_BROKERS', 'kafka:9092').split(',')
KAFKA_TOPIC = ENV.fetch('KAFKA_TOPIC', 'inference-jobs')
KAFKA_GROUP_ID = ENV.fetch('KAFKA_GROUP_ID', 'inference-workers')
REDIS_URL = ENV.fetch('REDIS_URL', 'redis://redis:6379/0')
KERAS_API_ENDPOINT = ENV.fetch('KERAS_API_ENDPOINT', 'http://keras-model:5000/predict')
CACHE_TTL_SECONDS = ENV.fetch('CACHE_TTL_SECONDS', 3600).to_i
# --- Setup Logging ---
Logging.color_scheme('bright',
:levels => {
:info => :green,
:warn => :yellow,
:error => :red,
:fatal => [:white, :on_red]
},
:date => :blue,
:logger => :cyan,
:message => :white
)
Logging.appenders.stdout(
'stdout',
:layout => Logging.layouts.pattern(
'[%d] %-5l %c: %m\n',
:color_scheme => 'bright'
)
)
LOGGER = Logging.logger[self]
LOGGER.add_appenders 'stdout'
LOGGER.level = :info
# --- Service Connections ---
begin
KAFKA = Kafka.new(KAFKA_BROKERS, client_id: KAFKA_GROUP_ID, logger: LOGGER)
REDIS = Redis.new(url: REDIS_URL)
# Test Redis connection
REDIS.ping
LOGGER.info "Successfully connected to Kafka and Redis."
rescue Kafka::ConnectionError => e
LOGGER.fatal "Failed to connect to Kafka: #{e.message}"
exit 1
rescue Redis::BaseConnectionError => e
LOGGER.fatal "Failed to connect to Redis: #{e.message}"
exit 1
end
class InferenceProcessor
def initialize(kafka_consumer)
@consumer = kafka_consumer
@consumer.subscribe(KAFKA_TOPIC)
end
def run
LOGGER.info "Consumer started. Listening for messages on topic '#{KAFKA_TOPIC}'..."
@consumer.each_message(automatically_mark_as_processed: false) do |message|
process_message(message)
end
end
private
def process_message(message)
LOGGER.info "Received message from partition #{message.partition} at offset #{message.offset}"
# Generate a cache key based on the content of the message body
# This ensures that identical inputs map to the same cache entry
cache_key = "inference_cache:#{Digest::SHA256.hexdigest(message.value)}"
# 1. Check Redis Cache
cached_result = REDIS.get(cache_key)
if cached_result
LOGGER.info "Cache HIT for key #{cache_key}. Skipping model inference."
# Even on cache hit, we commit the message to remove it from the queue
commit_offset(message)
return
end
LOGGER.info "Cache MISS for key #{cache_key}. Processing..."
# 2. Perform Model Inference (if cache miss)
begin
request_body = Oj.load(message.value) # Assuming body is JSON
response = HTTParty.post(
KERAS_API_ENDPOINT,
body: Oj.dump(request_body),
headers: { 'Content-Type' => 'application/json' },
timeout: 10 # Generous timeout
)
unless response.success?
raise "Keras service returned non-success status: #{response.code}"
end
inference_result = response.body
LOGGER.info "Successfully received inference from Keras service."
# 3. Store result in Redis Cache
REDIS.setex(cache_key, CACHE_TTL_SECONDS, inference_result)
LOGGER.info "Stored result in Redis with TTL #{CACHE_TTL_SECONDS}s."
# 4. Commit Kafka offset
commit_offset(message)
rescue Oj::ParseError => e
LOGGER.error "Failed to parse message value as JSON. Skipping message. Value: #{message.value}"
# This is an unrecoverable message, so we commit to avoid a poison pill scenario.
commit_offset(message)
rescue StandardError => e
# For transient errors (e.g., network issues, Keras service down),
# we log the error but DO NOT commit the offset. Kafka will re-deliver the message.
LOGGER.error "Failed to process message at offset #{message.offset}: #{e.message}. Will retry."
# A more robust solution would involve a dead-letter queue after N retries.
# For now, we rely on Kafka's redelivery.
end
end
def commit_offset(message)
@consumer.mark_message_as_processed(message)
@consumer.commit_offsets
LOGGER.info "Committed offset #{message.offset} for partition #{message.partition}."
end
end
# --- Main Execution ---
trap('TERM') { LOGGER.info 'Gracefully shutting down'; exit }
trap('INT') { LOGGER.info 'Gracefully shutting down'; exit }
consumer = KAFKA.consumer(group_id: KAFKA_GROUP_ID)
processor = InferenceProcessor.new(consumer)
processor.run
This consumer code explicitly handles error scenarios. A JSON parsing error is considered unrecoverable, so the offset is committed to prevent the consumer from getting stuck on a “poison pill” message. Any other error (like the Keras service being down) is considered transient, and the offset is not committed, allowing Kafka to re-deliver the message for a later retry attempt.
Part 3: The Supporting Infrastructure
To make this entire system runnable and testable, we define it using Docker Compose. This includes Kong with our custom plugin mounted, Kafka, Zookeeper, Redis, our Ruby consumer, and a mock Keras service.
The mock Keras service is a simple Python Flask app that introduces an artificial delay to simulate real model inference time.
keras_model/app.py
# keras_model/app.py
from flask import Flask, request, jsonify
import time
import random
import logging
app = Flask(__name__)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@app.route('/predict', methods=['POST'])
def predict():
# Simulate model inference delay
delay = random.uniform(0.25, 0.60)
logging.info(f"Received request. Simulating inference for {delay:.2f} seconds.")
time.sleep(delay)
data = request.get_json()
if not data:
return jsonify({"error": "Invalid input"}), 400
# Mock response
user_id = data.get("user_id", "unknown")
item_id = data.get("item_id", "unknown")
prediction_score = random.random()
response = {
"user_id": user_id,
"item_id": item_id,
"prediction": prediction_score,
"model_version": "v1.2.3"
}
logging.info(f"Returning prediction: {prediction_score}")
return jsonify(response)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
keras_model/Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
CMD ["flask", "run", "--host=0.0.0.0"]
keras_model/requirements.txt
Flask==2.1.2
The final piece is the docker-compose.yml
file that orchestrates all the services.
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
redis:
image: redis:6.2-alpine
container_name: redis
ports:
- "6379:6379"
kong-db:
image: postgres:13
container_name: kong-db
environment:
- POSTGRES_USER=kong
- POSTGRES_DB=kong
- POSTGRES_PASSWORD=kong
healthcheck:
test: ["CMD", "pg_isready", "-U", "kong"]
interval: 5s
timeout: 5s
retries: 5
kong-migrations:
image: kong:2.8
container_name: kong-migrations
command: "kong migrations bootstrap"
depends_on:
kong-db:
condition: service_healthy
environment:
- KONG_DATABASE=postgres
- KONG_PG_HOST=kong-db
- KONG_PG_PASSWORD=kong
restart: on-failure
kong:
image: kong:2.8
container_name: kong
depends_on:
kong-migrations:
condition: service_completed_successfully
volumes:
- ./kong/plugins:/usr/local/share/lua/5.1/kong/plugins
environment:
- KONG_DATABASE=postgres
- KONG_PG_HOST=kong-db
- KONG_PG_PASSWORD=kong
- KONG_PROXY_ACCESS_LOG=/dev/stdout
- KONG_ADMIN_ACCESS_LOG=/dev/stdout
- KONG_PROXY_ERROR_LOG=/dev/stderr
- KONG_ADMIN_ERROR_LOG=/dev/stderr
- KONG_ADMIN_LISTEN=0.0.0.0:8001, 0.0.0.0:8444 ssl
- KONG_PLUGINS=bundled,async-inference # Enable our custom plugin
- KONG_LUA_PACKAGE_PATH=/usr/local/share/lua/5.1/?.lua;; # Ensure plugin path is correct
ports:
- "8000:8000"
- "8443:8443"
- "8001:8001"
- "8444:8444"
keras-model:
build: ./keras_model
container_name: keras-model
ports:
- "5000:5000"
environment:
- FLASK_APP=app.py
ruby-consumer:
build: . # Assuming Dockerfile is in the root
container_name: ruby-consumer
depends_on:
- kafka
- redis
- keras-model
environment:
- KAFKA_BROKERS=kafka:9092
- KAFKA_TOPIC=inference-jobs
- REDIS_URL=redis://redis:6379/0
- KERAS_API_ENDPOINT=http://keras-model:5000/predict
restart: always
networks:
default:
name: async-inference-net
Dockerfile
(for the Ruby consumer)
FROM ruby:3.0
WORKDIR /usr/src/app
COPY Gemfile Gemfile.lock ./
RUN bundle install
COPY consumer.rb .
CMD ["ruby", "consumer.rb"]
Part 4: System Integration and Testing
With docker-compose up --build
, the entire stack comes to life. The final step is to configure Kong. We create a service, a route, and then enable our async-inference
plugin on that route.
# Wait for Kong to be ready...
# 1. Create a dummy service (it will never be reached, but Kong requires it)
curl -i -X POST http://localhost:8001/services/ \
--data name=dummy-inference-service \
--data url=http://httpbin.org/post
# 2. Create a route to access the service
curl -i -X POST http://localhost:8001/services/dummy-inference-service/routes \
--data 'paths[]=/inference' \
--data name=inference-route
# 3. Enable our custom plugin on the route
curl -i -X POST http://localhost:8001/routes/inference-route/plugins \
--data "name=async-inference" \
--data "config.kafka_topic=inference-jobs" \
--data "config.kafka_brokers[0].host=kafka" \
--data "config.kafka_brokers[0].port=9092" \
--data "config.redis_host=redis" \
--data "config.rate_limit_per_minute=100"
Now, we can test the entire flow. Sending a request to Kong should result in a near-instant 202 Accepted
response.
$ time curl -i -X POST http://localhost:8000/inference \
-H "Content-Type: application/json" \
-d '{"user_id": "user-123", "item_id": "item-abc"}'
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Content-Length: 76
Connection: keep-alive
Date: Fri, 27 Oct 2023 10:30:00 GMT
Server: kong/2.8.0
X-Kong-Upstream-Latency: 2
X-Kong-Proxy-Latency: 1
{"message":"Request accepted for processing","job_id":"...some_generated_id..."}
real 0m0.045s
user 0m0.003s
sys 0m0.005s
The response time is ~45ms, even though the backend processing takes >250ms. Watching the logs (docker-compose logs -f ruby-consumer keras-model
), we can see the Ruby consumer pick up the message from Kafka, call the Keras service after a delay, and log the successful processing. Subsequent identical requests will result in a cache hit in the consumer logs.
This architecture successfully decoupled ingestion from processing, solving the latency problem at the API gateway. The system is now far more resilient to traffic spikes, as Kafka can absorb a massive backlog of requests for the consumer pool to work through.
The current implementation, however, has limitations. The primary one is that the client receives no direct result. This pattern is suitable for fire-and-forget operations, but not for request-response cycles. A production-ready version would need a mechanism to deliver the result, such as a WebSocket connection, a server-sent event stream, or a webhook callback registered by the client. Another approach would be for the 202
response to include a job_id
and a status-check URL that the client could poll. Furthermore, the error handling could be enhanced with a Dead Letter Queue (DLQ) in Kafka to handle messages that consistently fail, preventing them from blocking the queue while allowing for offline analysis. Finally, full-stack observability with distributed tracing would be essential to monitor the end-to-end latency of jobs as they flow through this distributed system.