Implementing a Globally Consistent API Quota Engine with Tyk, Apache Flink, and Google Cloud


The default rate-limiting mechanisms in our distributed Tyk API Gateway cluster became a source of production inconsistencies. With limits applied on a per-node basis, a client could effectively bypass their global quota by round-robining requests across different gateway instances. This not only violated our service level agreements for tiered plans but also left our backend services vulnerable to traffic spikes from a single, high-volume tenant. The initial fix, using Tyk’s built-in Redis rate limiter, introduced a high-latency dependency in the hot path and lacked the flexibility to implement complex, consumption-based quota logic. We needed a system that was globally consistent, highly available, and capable of processing stateful logic on API traffic in near real-time, without compromising the gateway’s core responsibility: low-latency request processing.

Our revised architecture decouples quota enforcement from quota computation. Tyk remains the enforcement point, but the decision logic is offloaded to a dedicated stream processing engine. API usage events are fired asynchronously from Tyk into a Google Cloud Pub/Sub topic. An Apache Flink cluster consumes this stream, maintains the state for every API key (e.g., tokens in a bucket), and writes enforcement decisions (BLOCK/ALLOW) to a low-latency data store, which in our case is Google Cloud Memorystore (Redis). The Tyk gateway then performs a simple, sub-millisecond check against Redis before processing an incoming request. This design transforms a complex, stateful problem into a highly scalable, event-driven workflow.

graph TD
    subgraph User Traffic
        Client[Client Application] --> LB[GCP Global Load Balancer]
    end

    subgraph GCP Project
        LB --> Tyk_GW1[Tyk Gateway Pod 1]
        LB --> Tyk_GW2[Tyk Gateway Pod 2]
        LB --> Tyk_GW3[Tyk Gateway Pod N]

        subgraph GKE Cluster
            Tyk_GW1 --> |1. Pre-request Check| Memorystore[(Memorystore for Redis)]
            Tyk_GW2 --> |1. Pre-request Check| Memorystore
            Tyk_GW3 --> |1. Pre-request Check| Memorystore

            Tyk_GW1 --> |3. Async Usage Event| PubSub[Pub/Sub Topic: api-usage-events]
            Tyk_GW2 --> |3. Async Usage Event| PubSub
            Tyk_GW3 --> |3. Async Usage Event| PubSub

            PubSub --> |4. Consume Events| FlinkCluster[Apache Flink Cluster]
            FlinkCluster -- 5. Compute State & Update --> Memorystore
            FlinkCluster -- Checkpoints/Savepoints --> GCS[Google Cloud Storage Bucket]
        end

        Tyk_GW1 --> |2. Forward Allowed Request| Backend[Backend Service]
        Tyk_GW2 --> |2. Forward Allowed Request| Backend
        Tyk_GW3 --> |2. Forward Allowed Request| Backend
    end

    style FlinkCluster fill:#f9f,stroke:#333,stroke-width:2px
    style Tyk_GW1 fill:#bbf,stroke:#333,stroke-width:2px
    style Tyk_GW2 fill:#bbf,stroke:#333,stroke-width:2px
    style Tyk_GW3 fill:#bbf,stroke:#333,stroke-width:2px

Infrastructure Provisioning on Google Cloud

In a real-world project, infrastructure must be managed as code. We use a combination of gcloud CLI commands for simplicity here, but these map directly to Terraform or Pulumi resources. We require a GKE cluster for our workloads, a Pub/Sub topic for event transport, a Memorystore instance for low-latency state checks, and a GCS bucket for Flink’s state persistence.

# Set project and region variables
export PROJECT_ID="your-gcp-project-id"
export REGION="us-central1"
export ZONE="us-central1-a"
gcloud config set project $PROJECT_ID
gcloud config set compute/region $REGION

# 1. GKE Cluster for Tyk and Flink
# A regional cluster provides higher availability.
gcloud container clusters create "tyk-flink-cluster" \
    --region "$REGION" \
    --machine-type "e2-standard-4" \
    --num-nodes "2" \
    --enable-autoscaling --min-nodes "2" --max-nodes "5" \
    --release-channel "regular" \
    --workload-pool "${PROJECT_ID}.svc.id.goog"

# 2. Pub/Sub Topic for Usage Events
gcloud pubsub topics create "api-usage-events"

# Create a subscription for Flink to consume from.
gcloud pubsub subscriptions create "flink-usage-consumer" \
    --topic="api-usage-events" \
    --ack-deadline=60

# 3. Memorystore (Redis) Instance for State
# This needs to be in the same region as GKE for low latency.
gcloud redis instances create "quota-engine-state" \
    --size=1 \
    --tier=BASIC \
    --region="$REGION" \
    --connect-mode=PRIVATE_SERVICE_ACCESS

# Note: You must have configured Private Service Access for your VPC.
# This is a critical step for security and performance.
# After creation, get the host IP for later configuration.
export REDIS_HOST=$(gcloud redis instances describe quota-engine-state --region $REGION --format 'value(host)')
echo "Memorystore Host: $REDIS_HOST"

# 4. GCS Bucket for Flink Checkpoints
# Using a regional bucket improves performance for Flink state operations.
export BUCKET_NAME="flink-checkpoints-${PROJECT_ID}"
gsutil mb -l $REGION "gs://${BUCKET_NAME}/"

A common mistake here is to place these components in different regions, introducing unacceptable network latency. The check from Tyk to Redis must be single-digit milliseconds at most; placing the GKE cluster and Memorystore instance in the same region is non-negotiable.

Tyk Custom Middleware for Asynchronous Eventing

The Tyk gateway configuration is split into two parts: a pre-request hook that checks the quota and a post-request hook that logs the usage. We use Python for its rapid development cycle, though Go would offer higher performance for the middleware itself.

The key design choice is to make the event publishing asynchronous. Blocking the request-response cycle to publish an event to Pub/Sub would add significant and variable latency. The post-hook’s only job is to fire and forget.

Here is the complete Python middleware (quota_middleware.py):

import os
import json
import redis
import logging
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# --- Configuration ---
# In a production setup, these should come from environment variables or a secret manager.
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
PUB_SUB_TOPIC = os.getenv("PUB_SUB_TOPIC_ID")

# --- Global Clients (initialized once per process) ---
# It's crucial to reuse clients to avoid connection churn.
# Tyk's Python interpreter is long-lived.
redis_client = None
publisher_client = None
topic_path = None

logging.basicConfig(level=logging.INFO)

def initialize_clients():
    """
    Lazy initialization of clients. This function is called on the first request.
    """
    global redis_client, publisher_client, topic_path
    if redis_client is None:
        try:
            # Use a connection pool for Redis.
            pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
            redis_client = redis.Redis(connection_pool=pool)
            redis_client.ping()
            logging.info("Successfully connected to Redis.")
        except redis.exceptions.ConnectionError as e:
            logging.error(f"Failed to connect to Redis: {e}")
            redis_client = None # Mark as unavailable

    if publisher_client is None and GCP_PROJECT_ID and PUB_SUB_TOPIC:
        try:
            # The Pub/Sub client handles batching and background threads automatically.
            publisher_client = pubsub_v1.PublisherClient()
            topic_path = publisher_client.topic_path(GCP_PROJECT_ID, PUB_SUB_TOPIC)
            logging.info(f"Pub/Sub publisher initialized for topic: {topic_path}")
        except Exception as e:
            logging.error(f"Failed to initialize Pub/Sub client: {e}")
            publisher_client = None

# --- Tyk Hooks ---

def pre_quota_check(request, session, spec):
    """
    This is the pre-request hook. It performs the synchronous check against Redis.
    """
    initialize_clients()

    api_key = request.get_header('X-Api-Key')
    if not api_key:
        # No key, let Tyk's authentication handle it.
        return request, session

    if not redis_client:
        logging.warning("Redis client not available. Bypassing quota check.")
        # Fail-open strategy: If we can't check the quota, we allow the request.
        # This prevents our monitoring/state system from causing a full outage.
        return request, session

    try:
        # The key in Redis is simple: quota:<api_key>
        # The value is "BLOCKED" or does not exist.
        is_blocked = redis_client.exists(f"quota:{api_key}")

        if is_blocked:
            logging.warning(f"Blocking request for API key: {api_key}")
            request.override_response.code = 429
            request.override_response.body = '{"error": "Rate limit exceeded"}'
            request.override_response.headers["Content-Type"] = "application/json"
    except redis.exceptions.RedisError as e:
        logging.error(f"Redis error during pre-check for key {api_key}: {e}")
        # Again, fail-open.
        pass

    return request, session


def post_log_usage(request, response, session, spec):
    """
    This is the post-request (post-auth) hook. It asynchronously publishes the usage event.
    """
    # This hook runs after authentication, so we can trust the session metadata.
    if not publisher_client:
        logging.warning("Pub/Sub client not available. Skipping usage log.")
        return

    # Don't log usage for failed requests (e.g., 4xx, 5xx), unless desired.
    # Here we only log successful requests.
    if response.code >= 400:
        return

    try:
        api_key = session.meta_data.get('token', '') # 'token' holds the API key in session metadata
        if not api_key:
            return

        # Define a cost for the API call. Can be retrieved from API definition metadata.
        # This allows different endpoints to consume different numbers of tokens.
        api_cost = int(spec.get('config_data', {}).get('api_cost', 1))

        event_payload = {
            "apiKey": api_key,
            "apiId": request.object.api_id,
            "path": request.object.url,
            "timestamp": request.object.date_created, # ISO 8601 format
            "cost": api_cost
        }

        data = json.dumps(event_payload).encode("utf-8")
        
        # The publish() method is non-blocking. It returns a future.
        future = publisher_client.publish(topic_path, data)
        # In a high-throughput scenario, you would not wait on the future.
        # You might add a callback for logging failures, but never block the response.
        # future.add_done_callback(callback)

    except Exception as e:
        # This must not fail the request. Log and move on.
        logging.error(f"Failed to publish usage event: {e}")

    # The post hook does not modify the response, so we don't return anything.
    return

To deploy this, the Tyk API definition is modified to include the middleware:

{
  "name": "My Protected API",
  "api_id": "my-api-1",
  "use_keyless": false,
  "auth": {
    "auth_header_name": "X-Api-Key"
  },
  "custom_middleware": {
    "pre": [
      {
        "name": "pre_quota_check",
        "path": "/opt/tyk-gateway/middleware/quota_middleware.py",
        "require_session": false
      }
    ],
    "post_auth": [
      {
        "name": "post_log_usage",
        "path": "/opt/tyk-gateway/middleware/quota_middleware.py",
        "require_session": true
      }
    ],
    "driver": "python"
  },
  "config_data": {
    "api_cost": "1" 
  }
  // ... other API settings
}

This is the core of the system. The Flink job consumes from Pub/Sub, maintains a token bucket for each API key using Flink’s KeyedState, and writes to Redis. We’ll use Java and Maven.

The pitfall to avoid is trying to manage state manually. Flink’s state management primitives, backed by RocksDB and checkpointed to GCS, provide exactly-once semantics and fault tolerance out of the box.

1. Project pom.xml Dependencies:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.17.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-gcp-pubsub</artifactId>
        <version>3.0.0-1.17</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-redis_2.12</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>1.17.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.15.2</version>
    </dependency>
</dependencies>

2. The Main Flink Job:

package com.mycompany.quota;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.gcp.pubsub.source.PubSubSource;
import org.apache.flink.connector.gcp.pubsub.source.reader.deserializer.PubSubDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import com.fasterxml.jackson.databind.ObjectMapper;

public class QuotaEngineJob {

    // --- Configuration Constants ---
    private static final long REPLENISH_INTERVAL_MS = 60_000; // 1 minute
    private static final int TOKENS_TO_REPLENISH = 100; // Replenish 100 tokens per minute
    private static final int MAX_BUCKET_CAPACITY = 6000; // Max burst capacity
    private static final int BLOCK_THRESHOLD = 0; // Block when tokens are at or below this value

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // --- Production Grade Configuration ---
        // 1. State Backend: RocksDB for large state, checkpointing to GCS
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("gs://<your-flink-checkpoints-bucket>/checkpoints");

        // 2. Enable Checkpointing for fault tolerance (every 1 minute)
        env.enableCheckpointing(60000);

        // --- Source: GCP Pub/Sub ---
        PubSubSource<String> source = PubSubSource.<String>builder()
            .setProjectName("your-gcp-project-id")
            .setSubscriptionName("flink-usage-consumer")
            .setDeserializationSchema(PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
            .build();

        DataStream<String> pubsubStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "PubSub Source");

        // --- Core Logic ---
        DataStream<ApiUsageEvent> eventStream = pubsubStream
            .map(new JsonToApiUsageEventMapper())
            .name("Parse JSON");

        // Key by API key to process quotas independently
        DataStream<QuotaDecision> decisionStream = eventStream
            .keyBy(ApiUsageEvent::getApiKey)
            .process(new TokenBucketProcessor())
            .name("Token Bucket State Processor");

        // --- Sink: Redis ---
        FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
            .setHost("your-redis-host-ip") // From gcloud command earlier
            .setPort(6379)
            .build();
        
        decisionStream.addSink(new RedisSink<>(jedisConfig, new QuotaRedisMapper()))
            .name("Redis Sink");

        env.execute("Real-time API Quota Engine");
    }

    // --- Data Structures and Functions ---

    public static class ApiUsageEvent {
        public String apiKey;
        public String apiId;
        public String path;
        public String timestamp;
        public int cost;
        // Getters & setters omitted for brevity
    }

    public static class QuotaDecision {
        public String apiKey;
        public boolean isBlocked;
        // Constructor, getters omitted for brevity
    }

    public static class JsonToApiUsageEventMapper extends RichMapFunction<String, ApiUsageEvent> {
        private transient ObjectMapper objectMapper;

        @Override
        public void open(Configuration parameters) {
            objectMapper = new ObjectMapper();
        }

        @Override
        public ApiUsageEvent map(String value) throws Exception {
            return objectMapper.readValue(value, ApiUsageEvent.class);
        }
    }

    public static class TokenBucketProcessor extends KeyedProcessFunction<String, ApiUsageEvent, QuotaDecision> {
        // State is scoped to the current key (API Key)
        private transient ValueState<Integer> tokenCountState;
        private transient ValueState<Boolean> isBlockedState;
        private transient ValueState<Long> timerState; // To ensure only one timer is active

        @Override
        public void open(Configuration parameters) {
            tokenCountState = getRuntimeContext().getState(new ValueStateDescriptor<>("tokenCount", Integer.class));
            isBlockedState = getRuntimeContext().getState(new ValueStateDescriptor<>("isBlocked", Boolean.class));
            timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("replenishTimer", Long.class));
        }

        @Override
        public void processElement(ApiUsageEvent event, Context ctx, Collector<QuotaDecision> out) throws Exception {
            Integer currentTokens = tokenCountState.value();
            if (currentTokens == null) {
                // First time we see this key, initialize with full capacity.
                currentTokens = MAX_BUCKET_CAPACITY;
            }
            
            Boolean isBlocked = isBlockedState.value();
            if (isBlocked == null) {
                isBlocked = false;
            }
            
            // Register a replenishment timer if one is not already set for this key.
            if (timerState.value() == null) {
                long nextTimer = ctx.timestamp() + REPLENISH_INTERVAL_MS;
                ctx.timerService().registerProcessingTimeTimer(nextTimer);
                timerState.update(nextTimer);
            }

            // Deduct cost
            int newTokens = currentTokens - event.cost;
            tokenCountState.update(newTokens);
            
            // Check if state needs to change (block/unblock)
            if (newTokens <= BLOCK_THRESHOLD && !isBlocked) {
                isBlockedState.update(true);
                out.collect(new QuotaDecision(event.apiKey, true));
            }
        }
        
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<QuotaDecision> out) throws Exception {
            // Timer fires to replenish tokens.
            Integer currentTokens = tokenCountState.value();
            if (currentTokens == null) return; // Should not happen if timer was set
            
            int newTokens = Math.min(currentTokens + TOKENS_TO_REPLENISH, MAX_BUCKET_CAPACITY);
            tokenCountState.update(newTokens);
            
            Boolean isBlocked = isBlockedState.value();
            if (isBlocked == null) isBlocked = false;

            // If we were blocked but now have enough tokens, unblock.
            if (isBlocked && newTokens > BLOCK_THRESHOLD) {
                isBlockedState.update(false);
                out.collect(new QuotaDecision(ctx.getCurrentKey(), false));
            }
            
            // Set the next timer.
            long nextTimer = timestamp + REPLENISH_INTERVAL_MS;
            ctx.timerService().registerProcessingTimeTimer(nextTimer);
            timerState.update(nextTimer);
        }
    }

    public static class QuotaRedisMapper implements RedisMapper<QuotaDecision> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            // If blocked, we SET a key with an expiry. If unblocked, we DEL the key.
            // This is more efficient than storing an ALLOW state for every key.
            // The absence of a key means ALLOWED.
            return new RedisCommandDescription(RedisCommand.SET); // We will override this
        }

        @Override
        public String getKeyFromData(QuotaDecision data) {
            return "quota:" + data.apiKey;
        }

        @Override
        public String getValueFromData(QuotaDecision data) {
            return "BLOCKED";
        }

        @Override
        public RedisCommand getRedisCommand(QuotaDecision data) {
            return data.isBlocked ? RedisCommand.SET : RedisCommand.DEL;
        }
    }
}

One problem we faced during early testing was a “thundering herd” after a Flink job restart. The job would restore its state from a GCS checkpoint, but the Redis instance would be empty. For a few moments, until Flink processed enough events to re-populate Redis with BLOCK decisions, all traffic was allowed through. The solution was to add a startup job phase in Flink (using a BroadcastProcessFunction or a separate RichMapFunction with a one-time flag) that reads the entire keyed state on restore and pre-populates Redis with the correct status for all keys. This adds to startup time but is crucial for correctness.

The design has an inherent latency window between an event occurring, it being processed by Flink, and the decision being written to Redis. This window is typically sub-second but means a few requests might slip through after a quota is technically exhausted. For rate-limiting, this is an acceptable trade-off for a massively scalable system. An alternative involving a synchronous call from Tyk to a Flink Queryable State endpoint was considered, but this would re-introduce a blocking dependency and tightly couple the gateway to the Flink cluster, which we explicitly designed to avoid. The current architecture prioritizes gateway independence and throughput.

A clear limitation of this implementation is that quota configurations (bucket size, replenish rate) are hardcoded. A production-ready evolution would involve Flink consuming a second stream of configuration data, perhaps from a Debezium CDC feed on a configuration database or another Pub/Sub topic. Using a BroadcastState pattern, Flink could then apply these rule changes dynamically to API keys without requiring a job restart, making the system truly dynamic.


  TOC