Implementing Stateful Request Enrichment in Kong via a High-Concurrency Clojure gRPC Service on Alibaba Cloud


Our standard Kong rate-limiting setup was proving insufficient. The requirement was to enforce dynamic quotas based on a user’s real-time subscription tier, campaign participation, and recent usage history—all data residing in Alibaba Cloud Tablestore. Attempting this logic within a standard Lua plugin was a non-starter. The complexity of the business rules, coupled with the need for robust, stateful connections to an external database, would have created an unmaintainable and poorly performing bottleneck right at the edge of our infrastructure. In a real-world project, the API gateway must be brutally fast and reliable; introducing complex, blocking I/O in its request path is a recipe for cascading failures.

Our initial thought was to use the request-transformer plugin to call an external RESTful service. This was quickly discarded. The overhead of HTTP/1.1 connection setup, teardown, and header parsing for every single request passing through the gateway was unacceptable. The added latency would violate our service level objectives. We then considered an event-driven approach using the http-log plugin to fire off data to a processing service asynchronously. While useful for analytics, this didn’t solve the core problem of synchronous request enrichment. The gateway needed a response before forwarding the request upstream.

This led us to a more robust architecture: a custom Kong plugin, written in Go for performance and type safety, that communicates with a dedicated, high-concurrency sidecar service over gRPC. This backend service, responsible for the stateful logic, would be written in Clojure to leverage its powerful concurrency primitives and the stability of the JVM. The entire stack would be deployed on Alibaba Cloud’s Container Service for Kubernetes (ACK) to ensure scalability and high availability.

The Architectural Blueprint: gRPC Bridge

The design hinges on a clean separation of concerns. Kong’s responsibility is routing, authentication, and basic policy enforcement. The complex, dynamic quota logic is delegated.

graph TD
    subgraph Client
        A[User Request]
    end

    subgraph Alibaba Cloud VPC
        B[ALB Load Balancer] --> C{Kong Gateway Pod};

        subgraph "Kubernetes Pod: Kong"
            C -->|Kong Request Pipeline| D[Custom Go Plugin];
        end

        D -- "gRPC Unary Call (Proto)" --> E{Clojure Service};

        subgraph "Kubernetes Deployment: Clojure Enrichment Service"
            E[Pod 1] --> F[Tablestore SDK];
            E2[Pod 2] --> F;
            E3[Pod 3] --> F;
        end

        F --> G[(Alibaba Cloud Tablestore)];

        D -- "Enriched Headers" --> C;
        C --> H[Upstream Service];
    end

    A --> B

The Go plugin intercepts the request in the access phase. It extracts user identifiers, makes a synchronous gRPC call to the Clojure service, and awaits a response. This response contains the calculated quota and a decision (allow/deny). The plugin then either adds headers for the upstream service to consume or terminates the request with a 429 Too Many Requests. A critical design point is the failure mode: if the Clojure service is unavailable or times out, the plugin must decide whether to “fail-open” (allow the request) or “fail-closed” (deny it). For this use case, we opted for fail-open with logging to avoid blocking legitimate traffic due to an issue in a non-critical component.

Defining the Contract: Protobuf

The first step is defining a rigid contract between the Go plugin and the Clojure service. Protobuf is the natural choice for gRPC.

proto/enrichment/v1/enricher.proto:

syntax = "proto3";

package enrichment.v1;

option go_package = "enricher/v1";

// The service definition for request enrichment.
service EnricherService {
  // Verifies and enriches a single request.
  rpc Enrich(EnrichRequest) returns (EnrichResponse);
}

// Contains the necessary details from the incoming HTTP request.
message EnrichRequest {
  string request_id = 1;
  string user_id = 2;
  string api_key = 3;
  string route_id = 4;
}

// Contains the decision and data to be added to the request.
message EnrichResponse {
  enum Decision {
    DECISION_UNSPECIFIED = 0;
    DECISION_ALLOW = 1;
    DECISION_DENY = 2;
  }

  Decision decision = 1;
  string reason = 2; // Reason for denial, e.g., "QUOTA_EXCEEDED"
  int64 remaining_quota = 3;

  // Headers to be added to the original request before proxying upstream.
  map<string, string> headers_to_add = 4;
}

This contract is simple but effective. It clearly defines the inputs and outputs, and the generated code in both Go and Java (for Clojure) will ensure type safety across the service boundary.

The Logic Core: Clojure gRPC Service

Clojure’s strengths in handling concurrent state and data transformation make it an excellent choice for the enrichment service. We use the io.github.protojure/grpc-server library for the gRPC implementation and Alibaba Cloud’s official Java SDK for Tablestore.

Here is the project structure using deps.edn:

;; deps.edn
{:paths ["src" "resources"]
 :deps  {org.clojure/clojure {:mvn/version "1.11.1"}
         org.clojure/core.async {:mvn/version "1.6.681"}
         io.github.protojure/grpc-server {:mvn/version "2.5.0"}
         io.github.protojure/protoc-plugin {:mvn/version "2.5.0"}
         com.aliyun.openservices/tablestore {:mvn/version "5.16.1"}
         com.taoensso/timbre {:as-alias timbre :mvn/version "6.3.1"}
         com.cognitect/atoms {:mvn/version "0.1.0"}} ;; For a simple cache example

 :aliases
 {:protoc {:main-opts ["-m" "protojure.protoc.main"
                       "-I" "proto"
                       "--java_out=src"
                       "--protojure_out=src"
                       "proto/enrichment/v1/enricher.proto"]}}}

After running clj -M:protoc, the necessary Java and Clojure stubs are generated. Now, for the service implementation.

src/enricher/core.clj:

(ns enricher.core
  (:require [enrichment.v1.EnricherService.server :as server]
            [protojure.grpc.server.core :as grpc-server]
            [clojure.core.async :as a]
            [taoensso.timbre :as log]
            [enricher.tablestore :as ts])
  (:import (enrichment.v1 EnrichRequest EnrichResponse EnrichResponse$Decision))
  (:gen-class))

;; A simple, in-memory cache to reduce Tablestore hits for hot users.
;; In a real-world project, this would be a more sophisticated cache like an LRU
;; with TTL, or even a distributed cache like Redis.
(def user-cache (atom {}))

(defn- get-user-data [user-id]
  (let [cached-val (get @user-cache user-id)]
    (if (and cached-val (< (- (System/currentTimeMillis) (:timestamp cached-val)) 300000)) ;; 5 min cache
      (:data cached-val)
      (let [fresh-data (ts/fetch-user-quota user-id)]
        (when fresh-data
          (swap! user-cache assoc user-id {:timestamp (System/currentTimeMillis) :data fresh-data}))
        fresh-data))))

(defrecord Enricher []
  server/EnricherService
  (Enrich [this {{:keys [user_id request_id]} :params}]
    (log/info "Enriching request" {:request-id request_id :user-id user_id})
    (let [chan (a/chan)]
      (a/go
        (try
          (if-let [user-data (get-user-data user_id)]
            (let [{:keys [quota-remaining tier]} user-data]
              (if (> quota-remaining 0)
                ;; --- Core Logic ---
                ;; Decrement quota (in a real system, this would be an atomic operation)
                (ts/decrement-user-quota! user_id)
                (let [response (EnrichResponse/newBuilder)
                      headers {"X-User-Tier" tier
                               "X-Quota-Remaining" (str (dec quota-remaining))}]
                  (.setDecision response EnrichResponse$Decision/DECISION_ALLOW)
                  (.setRemainingQuota response (dec quota-remaining))
                  (.putAllHeadersToAdd response headers)
                  (a/>! chan {:status 0 :instance (.build response)})))
                ;; --- Quota Exceeded ---
                (let [response (EnrichResponse/newBuilder)]
                  (.setDecision response EnrichResponse$Decision/DECISION_DENY)
                  (.setReason response "QUOTA_EXCEEDED")
                  (.setRemainingQuota response 0)
                  (a/>! chan {:status 0 :instance (.build response)}))))
            ;; --- User Not Found ---
            (let [response (EnrichResponse/newBuilder)]
              (.setDecision response EnrichResponse$Decision/DECISION_DENY)
              (.setReason response "USER_NOT_FOUND")
              (a/>! chan {:status 0 :instance (.build response)})))
          (catch Exception e
            (log/error e "Error during enrichment process" {:request-id request_id})
            ;; Fail-closed from the service's perspective. The client plugin
            ;; will decide the ultimate fail-open/closed strategy.
            (a/>! chan {:status 13 :message "Internal enrichment error"}))))
      chan)))

(defn -main [& args]
  (let [port 50051]
    (log/info "Starting Clojure gRPC server on port" port)
    (-> (grpc-server/new-server port)
        (grpc-server/add-service (server/create-EnricherService (->Enricher)))
        (grpc-server/start)
        (grpc-server/await-termination))))

The Tablestore interaction logic is abstracted away for clarity. The key takeaway is the use of core.async to handle the blocking I/O call to the database without tying up the server’s main threads. The response is put onto a channel, which protojure handles gracefully. Error handling is explicit: any exception results in a gRPC internal error status, which the Go plugin must be prepared to handle.

The Gateway Interface: Kong Go Plugin

Now for the client side. We’ll use Kong’s official Go PDK (go-pdk). The plugin’s responsibility is to manage the gRPC connection efficiently and translate gRPC errors into appropriate gateway behavior.

main.go:

package main

import (
	"context"
	"enricher/v1"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/Kong/go-pdk"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

type Config struct {
	EnricherServiceAddress string `json:"enricher_service_address"`
	RequestTimeoutMs       int    `json:"request_timeout_ms"`
	FailOpen               bool   `json:"fail_open"`
}

// A global gRPC connection. A common mistake is to create a new connection per request.
// In a production scenario, you'd want more sophisticated connection pooling and health checking.
var grpcConn *grpc.ClientConn

func New() interface{} {
	return &Config{}
}

// init is called once when the plugin is loaded by a Kong worker.
// This is the ideal place to establish long-lived connections.
func init() {
	// Reading config from environment variables is more flexible for Kubernetes deployments.
	address := os.Getenv("KONG_ENRICHER_SERVICE_ADDRESS")
	if address == "" {
		log.Printf("[enricher-plugin] KONG_ENRICHER_SERVICE_ADDRESS not set, plugin will be disabled")
		return
	}

	// In production, use secure credentials.
	opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}

	var err error
	grpcConn, err = grpc.Dial(address, opts...)
	if err != nil {
		log.Printf("[enricher-plugin] Failed to connect to gRPC service: %v", err)
		grpcConn = nil // Ensure it's nil so Access phase can check.
	}
}

// Access is the core function called for each request.
func (conf Config) Access(kong *pdk.PDK) {
	if grpcConn == nil {
		handleConnectionError(kong, conf, "gRPC connection not established")
		return
	}

	// 1. Extract necessary info from the request.
	apiKey, err := kong.Request.GetHeader("X-Api-Key")
	if err != nil {
		_ = kong.Response.Exit(400, "API_KEY_MISSING", `{"error": "Header X-Api-Key is required"}`)
		return
	}
	// For demonstration, we assume user_id can be derived from the API key.
	// In a real system, this might come from a JWT or another auth mechanism.
	userID := "user-" + apiKey 
	requestID, _ := kong.Request.GetHeader("X-Request-ID")

	// 2. Prepare and send the gRPC request.
	client := v1.NewEnricherServiceClient(grpcConn)
	timeout := time.Duration(conf.RequestTimeoutMs) * time.Millisecond
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	req := &v1.EnrichRequest{
		RequestId: requestID,
		UserId:    userID,
		ApiKey:    apiKey,
	}

	resp, err := client.Enrich(ctx, req)
	if err != nil {
		handleConnectionError(kong, conf, fmt.Sprintf("gRPC call failed: %v", err))
		return
	}

	// 3. Process the gRPC response.
	if resp.Decision == v1.EnrichResponse_DECISION_DENY {
		body := fmt.Sprintf(`{"error": "Request denied", "reason": "%s"}`, resp.Reason)
		headers := map[string][]string{"Content-Type": {"application/json"}}
		_ = kong.Response.Exit(429, resp.Reason, body, headers)
		return
	}

	// 4. If allowed, enrich the request with new headers.
	for key, val := range resp.HeadersToAdd {
		kong.ServiceRequest.SetHeader(key, val)
	}
	// Also pass along the remaining quota for upstream consumption.
	kong.ServiceRequest.SetHeader("X-Quota-Remaining", strconv.FormatInt(resp.RemainingQuota, 10))
}

// A critical function for production stability.
func handleConnectionError(kong *pdk.PDK, conf Config, logMsg string) {
	kong.Log.Err(logMsg)
	if !conf.FailOpen {
		_ = kong.Response.Exit(503, "ENRICHMENT_SERVICE_UNAVAILABLE", `{"error": "Service temporarily unavailable"}`)
	}
	// If FailOpen is true, we simply do nothing and let the request pass through.
}

The Go plugin code is direct and pragmatic. It establishes a single gRPC connection at worker initialization to minimize overhead. The Access function contains the logic for each request: extract data, call the gRPC service with a strict timeout, and act on the response. The handleConnectionError function implements our chosen fail-open/fail-closed strategy, a critical aspect of gateway plugin development. A pitfall here is not setting a timeout; an unresponsive backend service could otherwise stall a Kong worker, leading to a major outage.

Deployment on Alibaba Cloud ACK

With the components built, we deploy them to ACK.

  1. Clojure Service Deployment: We containerize the Clojure application and deploy it as a standard Kubernetes Deployment with a Service.

    kubernetes/clojure-service.yaml:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: enrichment-service
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: enrichment-service
      template:
        metadata:
          labels:
            app: enrichment-service
        spec:
          containers:
          - name: enricher
            image: registry.cn-hangzhou.aliyuncs.com/my-apps/clojure-enricher:1.0.0
            ports:
            - containerPort: 50051
            env:
            - name: TS_ENDPOINT
              value: "https://my-instance.cn-hangzhou.tablestore.aliyuncs.com"
            - name: TS_INSTANCE_NAME
              value: "my-instance"
            # In production, use secrets for credentials (e.g., from Alibaba Cloud KMS/Secrets Manager)
            - name: ALIBABA_CLOUD_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: alicloud-creds
                  key: accessKeyId
            - name: ALIBABA_CLOUD_ACCESS_KEY_SECRET
              valueFrom:
                secretKeyRef:
                  name: alicloud-creds
                  key: accessKeySecret
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: enrichment-service
    spec:
      selector:
        app: enrichment-service
      ports:
        - protocol: TCP
          port: 50051
          targetPort: 50051
  2. Kong Plugin Configuration: We configure Kong to use the new plugin. If using the Kong Ingress Controller, this is done via a KongPlugin Custom Resource.

    kubernetes/kong-plugin-config.yaml:

    apiVersion: configuration.konghq.com/v1
    kind: KongPlugin
    metadata:
      name: request-enricher
      # This plugin is global, but you can attach it to specific Services or Ingresses.
      annotations:
        kubernetes.io/ingress.class: kong
    plugin: go-request-enricher # The name specified in the Go plugin's Makefile/build
    config:
      enricher_service_address: "enrichment-service.default.svc.cluster.local:50051"
      request_timeout_ms: 50 # A tight timeout is crucial
      fail_open: true

    Kong itself must be configured to find and load the Go plugin. This typically involves placing the compiled plugin binary in a specific directory and updating the KONG_PLUGINS and KONG_GO_PLUGINS_DIR environment variables in the Kong deployment.

This setup decouples the complex, stateful logic from the gateway, allowing each component to be scaled and maintained independently. The Clojure service can be updated without gateway downtime, and the use of gRPC over a Kubernetes service name provides a fast and reliable communication channel.

The primary trade-off in this architecture is the added network hop. While gRPC is efficient, a call from the Kong pod to the enrichment service pod still introduces latency. We measured this to be in the P99 range of 2-5ms within the same Kubernetes cluster, which was an acceptable addition for the functionality gained. Future optimization could involve deploying the Clojure service as a true sidecar within the Kong pod, communicating over the localhost interface to eliminate network latency entirely, though this would tighten the deployment coupling.

Another point of contention is the local cache in each Clojure pod. It reduces load on Tablestore but can lead to slightly stale data if a user’s requests are load-balanced across different pods. For our quota system, this minor inconsistency was acceptable. A system requiring strict consistency would necessitate a distributed cache like Redis, which would add another network hop and a new potential point of failure. There is no perfect solution, only a series of conscious trade-offs based on specific system requirements.


  TOC