Implementing an Asynchronous Inference Gateway with Kong, Kafka, and Ruby


The primary bottleneck in our real-time personalization service was a synchronous call to a Keras model. The model inference, running on a separate Python service, consistently took between 250ms and 600ms. This latency was directly exposed to the client through our Kong API Gateway, violating our P99 latency SLO of 200ms. Simply increasing the number of model service instances provided diminishing returns and failed to address the core problem: synchronous blocking I/O for a long-running task at the edge. The system couldn’t handle traffic spikes, leading to request timeouts and a degraded user experience.

Our initial concept was to break this synchronous coupling. The gateway should not wait for the inference to complete. Instead, it should accept the request, guarantee its eventual processing, and immediately return a success acknowledgment to the client. This pointed directly to an event-driven architecture, where the gateway acts as a fast ingestion point, publishing inference jobs to a durable message queue. A separate pool of backend workers could then consume these jobs at their own pace.

This architectural shift presented its own set of technology choices and implementation challenges. We were already committed to Kong as our API Gateway. Replacing it was not an option. The natural path forward was to leverage its extensibility through custom plugins. For the message bus, Apache Kafka was the obvious choice due to its high-throughput capabilities and data durability, which are critical for preventing data loss during traffic surges. Our backend services are predominantly written in Ruby, so the consumer fleet would be built using the ruby-kafka library. Redis was brought in to serve two distinct, critical roles: first, as a high-performance rate-limiter at the Kong plugin level to protect the entire downstream system from abuse, and second, as an inference cache and state store for the Ruby consumers to avoid redundant model computations. Keras, via a simple TF-Serving-like HTTP endpoint, would remain as the core ML component, but now isolated from the client-facing request path.

The architecture crystallized into a flow:

  1. A client sends a POST request to Kong.
  2. A custom Lua plugin in Kong intercepts the request in the access phase.
  3. The plugin checks a rate limit against Redis.
  4. If the limit is not exceeded, it serializes the request body and publishes it as a message to a Kafka topic.
  5. Upon successful publish acknowledgment from Kafka, the plugin immediately terminates the request lifecycle and sends an HTTP 202 Accepted response back to the client.
  6. A pool of Ruby consumers, running as a separate service, listens to the Kafka topic.
  7. Upon receiving a message, a Ruby worker first checks Redis for a cached result for the given input.
  8. If a cache miss occurs, it calls the Keras model service, gets the inference result, and stores it in Redis before completing its work.
  9. The Kafka consumer commits the message offset.

This design decouples the ingestion layer from the processing layer, turning a slow, synchronous endpoint into a fast, asynchronous one.

sequenceDiagram
    participant Client
    participant Kong Gateway
    participant Custom Plugin (Lua)
    participant Redis
    participant Kafka
    participant Ruby Consumer
    participant Keras Service

    Client->>+Kong Gateway: POST /inference
    Kong Gateway->>+Custom Plugin (Lua): access phase
    Custom Plugin (Lua)->>+Redis: INCR rate_limit_key
    Redis-->>-Custom Plugin (Lua): (integer) count
    alt Rate Limit Not Exceeded
        Custom Plugin (Lua)->>+Kafka: Produce(request_payload)
        Kafka-->>-Custom Plugin (Lua): Ack
        Custom Plugin (Lua)->>+Kong Gateway: kong.response.exit(2022)
        Kong Gateway-->>-Client: HTTP 202 Accepted
    else Rate Limit Exceeded
        Custom Plugin (Lua)->>+Kong Gateway: kong.response.exit(429)
        Kong Gateway-->>-Client: HTTP 429 Too Many Requests
    end
    deactivate Kong Gateway

    loop Async Processing
        Ruby Consumer->>+Kafka: Consume(request_payload)
        Ruby Consumer->>+Redis: GET inference_cache_key
        alt Cache Miss
            Redis-->>-Ruby Consumer: nil
            Ruby Consumer->>+Keras Service: POST /predict
            Keras Service-->>-Ruby Consumer: { "result": ... }
            Ruby Consumer->>+Redis: SETEX inference_cache_key 3600 result
        else Cache Hit
            Redis-->>-Ruby Consumer: cached_result
        end
        Ruby Consumer->>Kafka: Commit Offset
    end

Part 1: The Custom Kong Plugin

The core of this architecture lies within Kong. A custom plugin, written in Lua, is required to implement this non-standard request handling logic. The plugin needs dependencies for interacting with Kafka and Redis, which in the OpenResty environment are typically lua-resty-kafka and lua-resty-redis.

The plugin’s directory structure is standard for Kong:

kong/plugins/async-inference/
├── handler.lua
└── schema.lua

The schema.lua file defines the configuration parameters for our plugin, such as Kafka broker addresses, the target topic, and Redis connection details. This makes the plugin reusable and configurable via Kong’s Admin API.

schema.lua

-- kong/plugins/async-inference/schema.lua

local typedefs = require "kong.db.schema.typedefs"

return {
  name = "async-inference",
  fields = {
    { consumer = typedefs.no_consumer },
    { route = typedefs.no_route },
    { service = typedefs.no_service },
    {
      config = {
        type = "record",
        fields = {
          { kafka_brokers = {
              type = "array",
              required = true,
              default = { { host = "kafka", port = 9092 } },
              elements = {
                type = "record",
                fields = {
                  { host = { type = "string", required = true } },
                  { port = { type = "integer", required = true, default = 9092 } },
                }
              }
            }
          },
          { kafka_topic = { type = "string", required = true, default = "inference-jobs" } },
          { redis_host = { type = "string", required = true, default = "redis" } },
          { redis_port = { type = "integer", required = true, default = 6379 } },
          { redis_timeout = { type = "integer", default = 2000 } }, -- ms
          { rate_limit_per_minute = { type = "integer", default = 1000 } },
        },
      },
    },
  },
}

The main logic resides in handler.lua. We hook into the access phase of the Kong request lifecycle. This phase runs before Kong proxies the request to an upstream service, giving us full control.

handler.lua

-- kong/plugins/async-inference/handler.lua

local cjson = require "cjson"
local redis = require "resty.redis"
local producer = require "resty.kafka.producer"

local AsyncInferenceHandler = {
  PRIORITY = 1000,
  VERSION = "0.1.0",
}

-- Helper function to connect to Redis
local function connect_redis(conf)
  local red = redis:new()
  red:set_timeout(conf.redis_timeout)
  local ok, err = red:connect(conf.redis_host, conf.redis_port)
  if not ok then
    kong.log.err("failed to connect to redis: ", err)
    return nil, err
  end
  return red, nil
end

-- Check rate limit against Redis
local function check_rate_limit(conf)
  local red, err = connect_redis(conf)
  if not red then
    -- Fail open if Redis is down. A critical production decision.
    -- Alternatively, could fail closed by returning an error.
    kong.log.err("cannot connect to redis for rate limiting: ", err)
    return true
  end

  local client_ip = kong.client.get_ip()
  local current_minute = math.floor(ngx.now() / 60)
  local key = "rate_limit:async_inference:" .. client_ip .. ":" .. current_minute

  local res, err = red:incr(key)
  if err then
    kong.log.err("failed to incr rate limit key: ", err)
    return true -- Fail open
  end

  if res == 1 then
    -- This is the first request in this minute, set expiry
    red:expire(key, 60)
  end

  -- Close the Redis connection
  red:set_keepalive(0, 100)

  if res > conf.rate_limit_per_minute then
    return false
  end

  return true
end

function AsyncInferenceHandler:access(conf)
  -- Step 1: Enforce rate limiting
  local limit_ok = check_rate_limit(conf)
  if not limit_ok then
    return kong.response.exit(429, { message = "Rate limit exceeded" })
  end

  -- Step 2: Read the request body
  -- This is a blocking operation, but necessary.
  local body, err = kong.request.get_raw_body()
  if err then
    kong.log.err("failed to get request body: ", err)
    return kong.response.exit(500, { message = "Internal Server Error: Cannot read body" })
  end

  if not body or body == "" then
    return kong.response.exit(400, { message = "Request body cannot be empty" })
  end
  
  -- We could add validation here (e.g. is it valid JSON?)
  -- For now, we pass it through as a raw string.

  -- Step 3: Produce to Kafka
  local p, err = producer:new(conf.kafka_brokers, { producer_type = "async" })
  if not p then
    kong.log.err("failed to create kafka producer: ", err)
    return kong.response.exit(500, { message = "Internal Server Error: Kafka producer failure" })
  end
  
  -- The key could be something meaningful for partitioning, like a user ID from a header
  local message_key = kong.request.get_header("X-User-ID") or ngx.now()
  
  local ok, err = p:send(conf.kafka_topic, message_key, body)
  if not ok then
    kong.log.err("failed to send message to kafka: ", err)
    return kong.response.exit(502, { message = "Bad Gateway: Could not queue request" })
  end

  -- Step 4: Short-circuit the request and return 202 Accepted
  -- This is the crucial step that prevents Kong from proxying upstream.
  local job_id = ngx.req.get_headers()["X-Request-ID"] or ngx.md5(body .. ngx.now())
  return kong.response.exit(202, { message = "Request accepted for processing", job_id = job_id })
end

return AsyncInferenceHandler

A key decision in this code is to “fail open” if Redis is unavailable. In a real-world project, this would depend on business requirements. Failing closed (denying the request) protects the system, while failing open prioritizes availability at the risk of overwhelming the backend.

Part 2: The Ruby Kafka Consumer

The consumer is a long-running Ruby process responsible for the actual work. It uses the ruby-kafka gem for robust consumer group management and redis-rb for caching.

Gemfile

source 'https://rubygems.org'

gem 'kafka-rb', '~> 2.7'
gem 'redis', '~> 4.6'
gem 'httparty', '~> 0.20'
gem 'oj', '~> 3.13' # A faster JSON parser
gem 'logging', '~> 2.3'

The consumer logic needs to be idempotent and resilient. If the process crashes after processing a message but before committing the offset, Kafka will re-deliver the message. Our use of a Redis cache helps make the processing idempotent; re-processing a message will result in a cache hit and avoid calling the Keras model again.

consumer.rb

# consumer.rb
require 'kafka'
require 'redis'
require 'httparty'
require 'oj'
require 'logging'
require 'digest'

# --- Configuration ---
KAFKA_BROKERS = ENV.fetch('KAFKA_BROKERS', 'kafka:9092').split(',')
KAFKA_TOPIC = ENV.fetch('KAFKA_TOPIC', 'inference-jobs')
KAFKA_GROUP_ID = ENV.fetch('KAFKA_GROUP_ID', 'inference-workers')
REDIS_URL = ENV.fetch('REDIS_URL', 'redis://redis:6379/0')
KERAS_API_ENDPOINT = ENV.fetch('KERAS_API_ENDPOINT', 'http://keras-model:5000/predict')
CACHE_TTL_SECONDS = ENV.fetch('CACHE_TTL_SECONDS', 3600).to_i

# --- Setup Logging ---
Logging.color_scheme('bright',
  :levels => {
    :info  => :green,
    :warn  => :yellow,
    :error => :red,
    :fatal => [:white, :on_red]
  },
  :date => :blue,
  :logger => :cyan,
  :message => :white
)

Logging.appenders.stdout(
  'stdout',
  :layout => Logging.layouts.pattern(
    '[%d] %-5l %c: %m\n',
    :color_scheme => 'bright'
  )
)

LOGGER = Logging.logger[self]
LOGGER.add_appenders 'stdout'
LOGGER.level = :info

# --- Service Connections ---
begin
  KAFKA = Kafka.new(KAFKA_BROKERS, client_id: KAFKA_GROUP_ID, logger: LOGGER)
  REDIS = Redis.new(url: REDIS_URL)
  # Test Redis connection
  REDIS.ping
  LOGGER.info "Successfully connected to Kafka and Redis."
rescue Kafka::ConnectionError => e
  LOGGER.fatal "Failed to connect to Kafka: #{e.message}"
  exit 1
rescue Redis::BaseConnectionError => e
  LOGGER.fatal "Failed to connect to Redis: #{e.message}"
  exit 1
end

class InferenceProcessor
  def initialize(kafka_consumer)
    @consumer = kafka_consumer
    @consumer.subscribe(KAFKA_TOPIC)
  end

  def run
    LOGGER.info "Consumer started. Listening for messages on topic '#{KAFKA_TOPIC}'..."
    @consumer.each_message(automatically_mark_as_processed: false) do |message|
      process_message(message)
    end
  end

  private

  def process_message(message)
    LOGGER.info "Received message from partition #{message.partition} at offset #{message.offset}"
    
    # Generate a cache key based on the content of the message body
    # This ensures that identical inputs map to the same cache entry
    cache_key = "inference_cache:#{Digest::SHA256.hexdigest(message.value)}"

    # 1. Check Redis Cache
    cached_result = REDIS.get(cache_key)

    if cached_result
      LOGGER.info "Cache HIT for key #{cache_key}. Skipping model inference."
      # Even on cache hit, we commit the message to remove it from the queue
      commit_offset(message)
      return
    end

    LOGGER.info "Cache MISS for key #{cache_key}. Processing..."

    # 2. Perform Model Inference (if cache miss)
    begin
      request_body = Oj.load(message.value) # Assuming body is JSON
      
      response = HTTParty.post(
        KERAS_API_ENDPOINT,
        body: Oj.dump(request_body),
        headers: { 'Content-Type' => 'application/json' },
        timeout: 10 # Generous timeout
      )

      unless response.success?
        raise "Keras service returned non-success status: #{response.code}"
      end
      
      inference_result = response.body
      LOGGER.info "Successfully received inference from Keras service."
      
      # 3. Store result in Redis Cache
      REDIS.setex(cache_key, CACHE_TTL_SECONDS, inference_result)
      LOGGER.info "Stored result in Redis with TTL #{CACHE_TTL_SECONDS}s."

      # 4. Commit Kafka offset
      commit_offset(message)

    rescue Oj::ParseError => e
      LOGGER.error "Failed to parse message value as JSON. Skipping message. Value: #{message.value}"
      # This is an unrecoverable message, so we commit to avoid a poison pill scenario.
      commit_offset(message)
    rescue StandardError => e
      # For transient errors (e.g., network issues, Keras service down),
      # we log the error but DO NOT commit the offset. Kafka will re-deliver the message.
      LOGGER.error "Failed to process message at offset #{message.offset}: #{e.message}. Will retry."
      # A more robust solution would involve a dead-letter queue after N retries.
      # For now, we rely on Kafka's redelivery.
    end
  end

  def commit_offset(message)
    @consumer.mark_message_as_processed(message)
    @consumer.commit_offsets
    LOGGER.info "Committed offset #{message.offset} for partition #{message.partition}."
  end
end

# --- Main Execution ---
trap('TERM') { LOGGER.info 'Gracefully shutting down'; exit }
trap('INT') { LOGGER.info 'Gracefully shutting down'; exit }

consumer = KAFKA.consumer(group_id: KAFKA_GROUP_ID)
processor = InferenceProcessor.new(consumer)
processor.run

This consumer code explicitly handles error scenarios. A JSON parsing error is considered unrecoverable, so the offset is committed to prevent the consumer from getting stuck on a “poison pill” message. Any other error (like the Keras service being down) is considered transient, and the offset is not committed, allowing Kafka to re-deliver the message for a later retry attempt.

Part 3: The Supporting Infrastructure

To make this entire system runnable and testable, we define it using Docker Compose. This includes Kong with our custom plugin mounted, Kafka, Zookeeper, Redis, our Ruby consumer, and a mock Keras service.

The mock Keras service is a simple Python Flask app that introduces an artificial delay to simulate real model inference time.

keras_model/app.py

# keras_model/app.py
from flask import Flask, request, jsonify
import time
import random
import logging

app = Flask(__name__)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

@app.route('/predict', methods=['POST'])
def predict():
    # Simulate model inference delay
    delay = random.uniform(0.25, 0.60)
    logging.info(f"Received request. Simulating inference for {delay:.2f} seconds.")
    time.sleep(delay)
    
    data = request.get_json()
    if not data:
        return jsonify({"error": "Invalid input"}), 400
        
    # Mock response
    user_id = data.get("user_id", "unknown")
    item_id = data.get("item_id", "unknown")
    
    prediction_score = random.random()
    
    response = {
        "user_id": user_id,
        "item_id": item_id,
        "prediction": prediction_score,
        "model_version": "v1.2.3"
    }
    
    logging.info(f"Returning prediction: {prediction_score}")
    return jsonify(response)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

keras_model/Dockerfile

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app.py .

CMD ["flask", "run", "--host=0.0.0.0"]

keras_model/requirements.txt

Flask==2.1.2

The final piece is the docker-compose.yml file that orchestrates all the services.

docker-compose.yml

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

  redis:
    image: redis:6.2-alpine
    container_name: redis
    ports:
      - "6379:6379"

  kong-db:
    image: postgres:13
    container_name: kong-db
    environment:
      - POSTGRES_USER=kong
      - POSTGRES_DB=kong
      - POSTGRES_PASSWORD=kong
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "kong"]
      interval: 5s
      timeout: 5s
      retries: 5

  kong-migrations:
    image: kong:2.8
    container_name: kong-migrations
    command: "kong migrations bootstrap"
    depends_on:
      kong-db:
        condition: service_healthy
    environment:
      - KONG_DATABASE=postgres
      - KONG_PG_HOST=kong-db
      - KONG_PG_PASSWORD=kong
    restart: on-failure

  kong:
    image: kong:2.8
    container_name: kong
    depends_on:
      kong-migrations:
        condition: service_completed_successfully
    volumes:
      - ./kong/plugins:/usr/local/share/lua/5.1/kong/plugins
    environment:
      - KONG_DATABASE=postgres
      - KONG_PG_HOST=kong-db
      - KONG_PG_PASSWORD=kong
      - KONG_PROXY_ACCESS_LOG=/dev/stdout
      - KONG_ADMIN_ACCESS_LOG=/dev/stdout
      - KONG_PROXY_ERROR_LOG=/dev/stderr
      - KONG_ADMIN_ERROR_LOG=/dev/stderr
      - KONG_ADMIN_LISTEN=0.0.0.0:8001, 0.0.0.0:8444 ssl
      - KONG_PLUGINS=bundled,async-inference # Enable our custom plugin
      - KONG_LUA_PACKAGE_PATH=/usr/local/share/lua/5.1/?.lua;; # Ensure plugin path is correct
    ports:
      - "8000:8000"
      - "8443:8443"
      - "8001:8001"
      - "8444:8444"

  keras-model:
    build: ./keras_model
    container_name: keras-model
    ports:
      - "5000:5000"
    environment:
      - FLASK_APP=app.py

  ruby-consumer:
    build: . # Assuming Dockerfile is in the root
    container_name: ruby-consumer
    depends_on:
      - kafka
      - redis
      - keras-model
    environment:
      - KAFKA_BROKERS=kafka:9092
      - KAFKA_TOPIC=inference-jobs
      - REDIS_URL=redis://redis:6379/0
      - KERAS_API_ENDPOINT=http://keras-model:5000/predict
    restart: always

networks:
  default:
    name: async-inference-net

Dockerfile (for the Ruby consumer)

FROM ruby:3.0

WORKDIR /usr/src/app

COPY Gemfile Gemfile.lock ./
RUN bundle install

COPY consumer.rb .

CMD ["ruby", "consumer.rb"]

Part 4: System Integration and Testing

With docker-compose up --build, the entire stack comes to life. The final step is to configure Kong. We create a service, a route, and then enable our async-inference plugin on that route.

# Wait for Kong to be ready...

# 1. Create a dummy service (it will never be reached, but Kong requires it)
curl -i -X POST http://localhost:8001/services/ \
  --data name=dummy-inference-service \
  --data url=http://httpbin.org/post

# 2. Create a route to access the service
curl -i -X POST http://localhost:8001/services/dummy-inference-service/routes \
  --data 'paths[]=/inference' \
  --data name=inference-route

# 3. Enable our custom plugin on the route
curl -i -X POST http://localhost:8001/routes/inference-route/plugins \
  --data "name=async-inference" \
  --data "config.kafka_topic=inference-jobs" \
  --data "config.kafka_brokers[0].host=kafka" \
  --data "config.kafka_brokers[0].port=9092" \
  --data "config.redis_host=redis" \
  --data "config.rate_limit_per_minute=100"

Now, we can test the entire flow. Sending a request to Kong should result in a near-instant 202 Accepted response.

$ time curl -i -X POST http://localhost:8000/inference \
  -H "Content-Type: application/json" \
  -d '{"user_id": "user-123", "item_id": "item-abc"}'

HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Content-Length: 76
Connection: keep-alive
Date: Fri, 27 Oct 2023 10:30:00 GMT
Server: kong/2.8.0
X-Kong-Upstream-Latency: 2
X-Kong-Proxy-Latency: 1

{"message":"Request accepted for processing","job_id":"...some_generated_id..."}

real    0m0.045s
user    0m0.003s
sys     0m0.005s

The response time is ~45ms, even though the backend processing takes >250ms. Watching the logs (docker-compose logs -f ruby-consumer keras-model), we can see the Ruby consumer pick up the message from Kafka, call the Keras service after a delay, and log the successful processing. Subsequent identical requests will result in a cache hit in the consumer logs.

This architecture successfully decoupled ingestion from processing, solving the latency problem at the API gateway. The system is now far more resilient to traffic spikes, as Kafka can absorb a massive backlog of requests for the consumer pool to work through.

The current implementation, however, has limitations. The primary one is that the client receives no direct result. This pattern is suitable for fire-and-forget operations, but not for request-response cycles. A production-ready version would need a mechanism to deliver the result, such as a WebSocket connection, a server-sent event stream, or a webhook callback registered by the client. Another approach would be for the 202 response to include a job_id and a status-check URL that the client could poll. Furthermore, the error handling could be enhanced with a Dead Letter Queue (DLQ) in Kafka to handle messages that consistently fail, preventing them from blocking the queue while allowing for offline analysis. Finally, full-stack observability with distributed tracing would be essential to monitor the end-to-end latency of jobs as they flow through this distributed system.


  TOC