The default rate-limiting mechanisms in our distributed Tyk API Gateway cluster became a source of production inconsistencies. With limits applied on a per-node basis, a client could effectively bypass their global quota by round-robining requests across different gateway instances. This not only violated our service level agreements for tiered plans but also left our backend services vulnerable to traffic spikes from a single, high-volume tenant. The initial fix, using Tyk’s built-in Redis rate limiter, introduced a high-latency dependency in the hot path and lacked the flexibility to implement complex, consumption-based quota logic. We needed a system that was globally consistent, highly available, and capable of processing stateful logic on API traffic in near real-time, without compromising the gateway’s core responsibility: low-latency request processing.
Our revised architecture decouples quota enforcement from quota computation. Tyk remains the enforcement point, but the decision logic is offloaded to a dedicated stream processing engine. API usage events are fired asynchronously from Tyk into a Google Cloud Pub/Sub topic. An Apache Flink cluster consumes this stream, maintains the state for every API key (e.g., tokens in a bucket), and writes enforcement decisions (BLOCK/ALLOW) to a low-latency data store, which in our case is Google Cloud Memorystore (Redis). The Tyk gateway then performs a simple, sub-millisecond check against Redis before processing an incoming request. This design transforms a complex, stateful problem into a highly scalable, event-driven workflow.
graph TD subgraph User Traffic Client[Client Application] --> LB[GCP Global Load Balancer] end subgraph GCP Project LB --> Tyk_GW1[Tyk Gateway Pod 1] LB --> Tyk_GW2[Tyk Gateway Pod 2] LB --> Tyk_GW3[Tyk Gateway Pod N] subgraph GKE Cluster Tyk_GW1 --> |1. Pre-request Check| Memorystore[(Memorystore for Redis)] Tyk_GW2 --> |1. Pre-request Check| Memorystore Tyk_GW3 --> |1. Pre-request Check| Memorystore Tyk_GW1 --> |3. Async Usage Event| PubSub[Pub/Sub Topic: api-usage-events] Tyk_GW2 --> |3. Async Usage Event| PubSub Tyk_GW3 --> |3. Async Usage Event| PubSub PubSub --> |4. Consume Events| FlinkCluster[Apache Flink Cluster] FlinkCluster -- 5. Compute State & Update --> Memorystore FlinkCluster -- Checkpoints/Savepoints --> GCS[Google Cloud Storage Bucket] end Tyk_GW1 --> |2. Forward Allowed Request| Backend[Backend Service] Tyk_GW2 --> |2. Forward Allowed Request| Backend Tyk_GW3 --> |2. Forward Allowed Request| Backend end style FlinkCluster fill:#f9f,stroke:#333,stroke-width:2px style Tyk_GW1 fill:#bbf,stroke:#333,stroke-width:2px style Tyk_GW2 fill:#bbf,stroke:#333,stroke-width:2px style Tyk_GW3 fill:#bbf,stroke:#333,stroke-width:2px
Infrastructure Provisioning on Google Cloud
In a real-world project, infrastructure must be managed as code. We use a combination of gcloud
CLI commands for simplicity here, but these map directly to Terraform or Pulumi resources. We require a GKE cluster for our workloads, a Pub/Sub topic for event transport, a Memorystore instance for low-latency state checks, and a GCS bucket for Flink’s state persistence.
# Set project and region variables
export PROJECT_ID="your-gcp-project-id"
export REGION="us-central1"
export ZONE="us-central1-a"
gcloud config set project $PROJECT_ID
gcloud config set compute/region $REGION
# 1. GKE Cluster for Tyk and Flink
# A regional cluster provides higher availability.
gcloud container clusters create "tyk-flink-cluster" \
--region "$REGION" \
--machine-type "e2-standard-4" \
--num-nodes "2" \
--enable-autoscaling --min-nodes "2" --max-nodes "5" \
--release-channel "regular" \
--workload-pool "${PROJECT_ID}.svc.id.goog"
# 2. Pub/Sub Topic for Usage Events
gcloud pubsub topics create "api-usage-events"
# Create a subscription for Flink to consume from.
gcloud pubsub subscriptions create "flink-usage-consumer" \
--topic="api-usage-events" \
--ack-deadline=60
# 3. Memorystore (Redis) Instance for State
# This needs to be in the same region as GKE for low latency.
gcloud redis instances create "quota-engine-state" \
--size=1 \
--tier=BASIC \
--region="$REGION" \
--connect-mode=PRIVATE_SERVICE_ACCESS
# Note: You must have configured Private Service Access for your VPC.
# This is a critical step for security and performance.
# After creation, get the host IP for later configuration.
export REDIS_HOST=$(gcloud redis instances describe quota-engine-state --region $REGION --format 'value(host)')
echo "Memorystore Host: $REDIS_HOST"
# 4. GCS Bucket for Flink Checkpoints
# Using a regional bucket improves performance for Flink state operations.
export BUCKET_NAME="flink-checkpoints-${PROJECT_ID}"
gsutil mb -l $REGION "gs://${BUCKET_NAME}/"
A common mistake here is to place these components in different regions, introducing unacceptable network latency. The check from Tyk to Redis must be single-digit milliseconds at most; placing the GKE cluster and Memorystore instance in the same region is non-negotiable.
Tyk Custom Middleware for Asynchronous Eventing
The Tyk gateway configuration is split into two parts: a pre-request hook that checks the quota and a post-request hook that logs the usage. We use Python for its rapid development cycle, though Go would offer higher performance for the middleware itself.
The key design choice is to make the event publishing asynchronous. Blocking the request-response cycle to publish an event to Pub/Sub would add significant and variable latency. The post-hook’s only job is to fire and forget.
Here is the complete Python middleware (quota_middleware.py
):
import os
import json
import redis
import logging
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
# --- Configuration ---
# In a production setup, these should come from environment variables or a secret manager.
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
PUB_SUB_TOPIC = os.getenv("PUB_SUB_TOPIC_ID")
# --- Global Clients (initialized once per process) ---
# It's crucial to reuse clients to avoid connection churn.
# Tyk's Python interpreter is long-lived.
redis_client = None
publisher_client = None
topic_path = None
logging.basicConfig(level=logging.INFO)
def initialize_clients():
"""
Lazy initialization of clients. This function is called on the first request.
"""
global redis_client, publisher_client, topic_path
if redis_client is None:
try:
# Use a connection pool for Redis.
pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
redis_client = redis.Redis(connection_pool=pool)
redis_client.ping()
logging.info("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
logging.error(f"Failed to connect to Redis: {e}")
redis_client = None # Mark as unavailable
if publisher_client is None and GCP_PROJECT_ID and PUB_SUB_TOPIC:
try:
# The Pub/Sub client handles batching and background threads automatically.
publisher_client = pubsub_v1.PublisherClient()
topic_path = publisher_client.topic_path(GCP_PROJECT_ID, PUB_SUB_TOPIC)
logging.info(f"Pub/Sub publisher initialized for topic: {topic_path}")
except Exception as e:
logging.error(f"Failed to initialize Pub/Sub client: {e}")
publisher_client = None
# --- Tyk Hooks ---
def pre_quota_check(request, session, spec):
"""
This is the pre-request hook. It performs the synchronous check against Redis.
"""
initialize_clients()
api_key = request.get_header('X-Api-Key')
if not api_key:
# No key, let Tyk's authentication handle it.
return request, session
if not redis_client:
logging.warning("Redis client not available. Bypassing quota check.")
# Fail-open strategy: If we can't check the quota, we allow the request.
# This prevents our monitoring/state system from causing a full outage.
return request, session
try:
# The key in Redis is simple: quota:<api_key>
# The value is "BLOCKED" or does not exist.
is_blocked = redis_client.exists(f"quota:{api_key}")
if is_blocked:
logging.warning(f"Blocking request for API key: {api_key}")
request.override_response.code = 429
request.override_response.body = '{"error": "Rate limit exceeded"}'
request.override_response.headers["Content-Type"] = "application/json"
except redis.exceptions.RedisError as e:
logging.error(f"Redis error during pre-check for key {api_key}: {e}")
# Again, fail-open.
pass
return request, session
def post_log_usage(request, response, session, spec):
"""
This is the post-request (post-auth) hook. It asynchronously publishes the usage event.
"""
# This hook runs after authentication, so we can trust the session metadata.
if not publisher_client:
logging.warning("Pub/Sub client not available. Skipping usage log.")
return
# Don't log usage for failed requests (e.g., 4xx, 5xx), unless desired.
# Here we only log successful requests.
if response.code >= 400:
return
try:
api_key = session.meta_data.get('token', '') # 'token' holds the API key in session metadata
if not api_key:
return
# Define a cost for the API call. Can be retrieved from API definition metadata.
# This allows different endpoints to consume different numbers of tokens.
api_cost = int(spec.get('config_data', {}).get('api_cost', 1))
event_payload = {
"apiKey": api_key,
"apiId": request.object.api_id,
"path": request.object.url,
"timestamp": request.object.date_created, # ISO 8601 format
"cost": api_cost
}
data = json.dumps(event_payload).encode("utf-8")
# The publish() method is non-blocking. It returns a future.
future = publisher_client.publish(topic_path, data)
# In a high-throughput scenario, you would not wait on the future.
# You might add a callback for logging failures, but never block the response.
# future.add_done_callback(callback)
except Exception as e:
# This must not fail the request. Log and move on.
logging.error(f"Failed to publish usage event: {e}")
# The post hook does not modify the response, so we don't return anything.
return
To deploy this, the Tyk API definition is modified to include the middleware:
{
"name": "My Protected API",
"api_id": "my-api-1",
"use_keyless": false,
"auth": {
"auth_header_name": "X-Api-Key"
},
"custom_middleware": {
"pre": [
{
"name": "pre_quota_check",
"path": "/opt/tyk-gateway/middleware/quota_middleware.py",
"require_session": false
}
],
"post_auth": [
{
"name": "post_log_usage",
"path": "/opt/tyk-gateway/middleware/quota_middleware.py",
"require_session": true
}
],
"driver": "python"
},
"config_data": {
"api_cost": "1"
}
// ... other API settings
}
The Flink Stateful Processing Job
This is the core of the system. The Flink job consumes from Pub/Sub, maintains a token bucket for each API key using Flink’s KeyedState
, and writes to Redis. We’ll use Java and Maven.
The pitfall to avoid is trying to manage state manually. Flink’s state management primitives, backed by RocksDB and checkpointed to GCS, provide exactly-once semantics and fault tolerance out of the box.
1. Project pom.xml
Dependencies:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>3.0.0-1.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
2. The Main Flink Job:
package com.mycompany.quota;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.gcp.pubsub.source.PubSubSource;
import org.apache.flink.connector.gcp.pubsub.source.reader.deserializer.PubSubDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import com.fasterxml.jackson.databind.ObjectMapper;
public class QuotaEngineJob {
// --- Configuration Constants ---
private static final long REPLENISH_INTERVAL_MS = 60_000; // 1 minute
private static final int TOKENS_TO_REPLENISH = 100; // Replenish 100 tokens per minute
private static final int MAX_BUCKET_CAPACITY = 6000; // Max burst capacity
private static final int BLOCK_THRESHOLD = 0; // Block when tokens are at or below this value
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// --- Production Grade Configuration ---
// 1. State Backend: RocksDB for large state, checkpointing to GCS
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("gs://<your-flink-checkpoints-bucket>/checkpoints");
// 2. Enable Checkpointing for fault tolerance (every 1 minute)
env.enableCheckpointing(60000);
// --- Source: GCP Pub/Sub ---
PubSubSource<String> source = PubSubSource.<String>builder()
.setProjectName("your-gcp-project-id")
.setSubscriptionName("flink-usage-consumer")
.setDeserializationSchema(PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
.build();
DataStream<String> pubsubStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "PubSub Source");
// --- Core Logic ---
DataStream<ApiUsageEvent> eventStream = pubsubStream
.map(new JsonToApiUsageEventMapper())
.name("Parse JSON");
// Key by API key to process quotas independently
DataStream<QuotaDecision> decisionStream = eventStream
.keyBy(ApiUsageEvent::getApiKey)
.process(new TokenBucketProcessor())
.name("Token Bucket State Processor");
// --- Sink: Redis ---
FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
.setHost("your-redis-host-ip") // From gcloud command earlier
.setPort(6379)
.build();
decisionStream.addSink(new RedisSink<>(jedisConfig, new QuotaRedisMapper()))
.name("Redis Sink");
env.execute("Real-time API Quota Engine");
}
// --- Data Structures and Functions ---
public static class ApiUsageEvent {
public String apiKey;
public String apiId;
public String path;
public String timestamp;
public int cost;
// Getters & setters omitted for brevity
}
public static class QuotaDecision {
public String apiKey;
public boolean isBlocked;
// Constructor, getters omitted for brevity
}
public static class JsonToApiUsageEventMapper extends RichMapFunction<String, ApiUsageEvent> {
private transient ObjectMapper objectMapper;
@Override
public void open(Configuration parameters) {
objectMapper = new ObjectMapper();
}
@Override
public ApiUsageEvent map(String value) throws Exception {
return objectMapper.readValue(value, ApiUsageEvent.class);
}
}
public static class TokenBucketProcessor extends KeyedProcessFunction<String, ApiUsageEvent, QuotaDecision> {
// State is scoped to the current key (API Key)
private transient ValueState<Integer> tokenCountState;
private transient ValueState<Boolean> isBlockedState;
private transient ValueState<Long> timerState; // To ensure only one timer is active
@Override
public void open(Configuration parameters) {
tokenCountState = getRuntimeContext().getState(new ValueStateDescriptor<>("tokenCount", Integer.class));
isBlockedState = getRuntimeContext().getState(new ValueStateDescriptor<>("isBlocked", Boolean.class));
timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("replenishTimer", Long.class));
}
@Override
public void processElement(ApiUsageEvent event, Context ctx, Collector<QuotaDecision> out) throws Exception {
Integer currentTokens = tokenCountState.value();
if (currentTokens == null) {
// First time we see this key, initialize with full capacity.
currentTokens = MAX_BUCKET_CAPACITY;
}
Boolean isBlocked = isBlockedState.value();
if (isBlocked == null) {
isBlocked = false;
}
// Register a replenishment timer if one is not already set for this key.
if (timerState.value() == null) {
long nextTimer = ctx.timestamp() + REPLENISH_INTERVAL_MS;
ctx.timerService().registerProcessingTimeTimer(nextTimer);
timerState.update(nextTimer);
}
// Deduct cost
int newTokens = currentTokens - event.cost;
tokenCountState.update(newTokens);
// Check if state needs to change (block/unblock)
if (newTokens <= BLOCK_THRESHOLD && !isBlocked) {
isBlockedState.update(true);
out.collect(new QuotaDecision(event.apiKey, true));
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<QuotaDecision> out) throws Exception {
// Timer fires to replenish tokens.
Integer currentTokens = tokenCountState.value();
if (currentTokens == null) return; // Should not happen if timer was set
int newTokens = Math.min(currentTokens + TOKENS_TO_REPLENISH, MAX_BUCKET_CAPACITY);
tokenCountState.update(newTokens);
Boolean isBlocked = isBlockedState.value();
if (isBlocked == null) isBlocked = false;
// If we were blocked but now have enough tokens, unblock.
if (isBlocked && newTokens > BLOCK_THRESHOLD) {
isBlockedState.update(false);
out.collect(new QuotaDecision(ctx.getCurrentKey(), false));
}
// Set the next timer.
long nextTimer = timestamp + REPLENISH_INTERVAL_MS;
ctx.timerService().registerProcessingTimeTimer(nextTimer);
timerState.update(nextTimer);
}
}
public static class QuotaRedisMapper implements RedisMapper<QuotaDecision> {
@Override
public RedisCommandDescription getCommandDescription() {
// If blocked, we SET a key with an expiry. If unblocked, we DEL the key.
// This is more efficient than storing an ALLOW state for every key.
// The absence of a key means ALLOWED.
return new RedisCommandDescription(RedisCommand.SET); // We will override this
}
@Override
public String getKeyFromData(QuotaDecision data) {
return "quota:" + data.apiKey;
}
@Override
public String getValueFromData(QuotaDecision data) {
return "BLOCKED";
}
@Override
public RedisCommand getRedisCommand(QuotaDecision data) {
return data.isBlocked ? RedisCommand.SET : RedisCommand.DEL;
}
}
}
One problem we faced during early testing was a “thundering herd” after a Flink job restart. The job would restore its state from a GCS checkpoint, but the Redis instance would be empty. For a few moments, until Flink processed enough events to re-populate Redis with BLOCK
decisions, all traffic was allowed through. The solution was to add a startup job phase in Flink (using a BroadcastProcessFunction
or a separate RichMapFunction
with a one-time flag) that reads the entire keyed state on restore and pre-populates Redis with the correct status for all keys. This adds to startup time but is crucial for correctness.
The design has an inherent latency window between an event occurring, it being processed by Flink, and the decision being written to Redis. This window is typically sub-second but means a few requests might slip through after a quota is technically exhausted. For rate-limiting, this is an acceptable trade-off for a massively scalable system. An alternative involving a synchronous call from Tyk to a Flink Queryable State endpoint was considered, but this would re-introduce a blocking dependency and tightly couple the gateway to the Flink cluster, which we explicitly designed to avoid. The current architecture prioritizes gateway independence and throughput.
A clear limitation of this implementation is that quota configurations (bucket size, replenish rate) are hardcoded. A production-ready evolution would involve Flink consuming a second stream of configuration data, perhaps from a Debezium CDC feed on a configuration database or another Pub/Sub topic. Using a BroadcastState
pattern, Flink could then apply these rule changes dynamically to API keys without requiring a job restart, making the system truly dynamic.