Building a Real-Time ISR Invalidation Pipeline for a Polyrepo Frontend with Cassandra CDC


The operational overhead of our content platform had become untenable. We were running a large-scale, multi-tenant system where each tenant’s public-facing site was a standalone application within a sprawling polyrepo structure. For performance, we leaned heavily on Incremental Static Regeneration (ISR), but this created a data consistency problem. The source of truth was a globally distributed Cassandra cluster, and product inventory, pricing, and availability data could change multiple times per second. The initial solution—a low revalidate interval in Next.js—was flooding our origin servers with requests and still resulted in stale data for unacceptable periods. Direct API calls from the services writing to Cassandra to trigger revalidation created a spaghetti-like coupling that was brittle and failed to capture out-of-band database updates. The core technical pain point was clear: how to propagate data changes from a distributed NoSQL database to trigger targeted page regenerations across dozens of independent front-end applications, in near real-time, without building a coupled mess or overwhelming our CI/CD infrastructure.

Our first iteration involved a direct webhook system. The backend service responsible for updating a product’s price would, after a successful UPDATE query to Cassandra, make an HTTP POST request to a specific Next.js application’s revalidation endpoint. This worked, but only for that one service. The system broke down when a data analytics batch job updated thousands of products directly in the database, completely bypassing the application service layer. Or when a support engineer made a manual correction via cqlsh. We needed a mechanism that treated the database itself as the source of truth for change events. This led us to Change Data Capture (CDC).

The final architecture hinges on decoupling the data source from the front-end build triggers. We placed a CDC pipeline at the heart of the system. Debezium would monitor Cassandra’s commit logs, publish change events to Kafka, and a new, centralized “ISR Dispatcher” service would consume these events. This dispatcher’s sole responsibility is to map a data change (e.g., product_id: 'abc-123') to the correct front-end application in our polyrepo and the corresponding URL path, then invoke the specific ISR webhook. This architecture is event-driven, scalable, and fully decouples the concerns of data persistence from front-end rendering.

Phase 1: Plumbing the Data Layer with CDC

A common mistake is underestimating the infrastructure required to make CDC reliable. We needed Cassandra, Zookeeper, Kafka, and the Debezium Connector for Cassandra running cohesively. For a production-grade local development and testing environment, we defined this stack using Docker Compose.

# docker-compose.yml
# A complete, runnable stack for Cassandra CDC with Debezium and Kafka.
# To run: docker-compose up -d

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  cassandra:
    image: cassandra:4.0
    container_name: cassandra
    ports:
      - "9042:9042"
    volumes:
      - ./cassandra.yaml:/etc/cassandra/cassandra.yaml
    environment:
      - CASSANDRA_CLUSTER_NAME=isr_cluster
      - CASSANDRA_DC=dc1
      - CASSANDRA_RACK=rack1
      - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch

  debezium-connect:
    image: debezium/connect:2.1
    container_name: debezium-connect
    depends_on:
      - kafka
      - cassandra
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

The critical piece for Cassandra itself is enabling CDC. This is a simple flag in its configuration. In a real-world project, this requires a rolling restart of the cluster.

# cassandra.yaml
# Only the relevant CDC configuration is shown.

# ... other cassandra settings
cdc_enabled: true
# ...

Once the infrastructure is up, we define our keyspace and a sample table for products. The cdc = true property at the table level is non-negotiable.

-- setup.cql - Run this against the Cassandra container after it's healthy.
-- docker exec -i cassandra cqlsh < setup.cql

CREATE KEYSPACE product_catalog WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

USE product_catalog;

CREATE TABLE products (
    product_id uuid PRIMARY KEY,
    tenant_id varchar,
    name text,
    description text,
    price decimal,
    stock_count int,
    last_updated timestamp
) WITH cdc = true;

With the database ready, the final step in this phase is configuring the Debezium connector via its REST API. This tells Debezium to watch our products table and stream changes to a specific Kafka topic.

# Script to register the Cassandra connector with Debezium
# Save as register-connector.sh and execute.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "cassandra-product-connector",
  "config": {
    "connector.class": "io.debezium.connector.cassandra.CassandraConnector",
    "tasks.max": "1",
    "cassandra.hosts": "cassandra",
    "cassandra.port": "9042",
    "cassandra.user": "cassandra",
    "cassandra.password": "cassandra",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "snapshot.mode": "initial",
    "topic.prefix": "isr_pipeline",
    "table.include.list": "product_catalog.products"
  }
}'

After running this, any INSERT, UPDATE, or DELETE on the products table will produce a detailed JSON message on the isr_pipeline.product_catalog.products Kafka topic. The plumbing is complete.

Phase 2: The ISR Dispatcher Service

This service is the brain of the operation. It’s a stateless Go application designed for high throughput and low latency. Its job is to consume from Kafka, parse the Debezium event, apply routing logic to find the correct frontend, and fire a webhook.

Here is the core structure of the dispatcher. The real-world implementation includes robust error handling, structured logging with logrus, and graceful shutdowns to prevent message loss.

// main.go - ISR Dispatcher Service

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"log"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/IBM/sarama"
	"github.com/sirupsen/logrus"
)

// Represents the routing configuration. In a real system, this would come
// from a config file, a service discovery system, or a database.
var tenantRouting = map[string]struct {
	WebhookURL    string
	SecretToken   string
}{
	"tenant-a": {
		WebhookURL:  "http://localhost:3001/api/revalidate",
		SecretToken: "SECRET_TOKEN_A",
	},
	"tenant-b": {
		WebhookURL:  "http://localhost:3002/api/revalidate",
		SecretToken: "SECRET_TOKEN_B",
	},
}

// Simplified Debezium event structure for a Cassandra change.
// We only care about the `after` state to get the tenant_id and product_id.
type DebeziumPayload struct {
	After struct {
		ProductID string `json:"product_id"`
		TenantID  string `json:"tenant_id"`
	} `json:"after"`
}

type DebeziumMessage struct {
	Payload DebeziumPayload `json:"payload"`
}

// RevalidationRequestBody is the payload sent to the Next.js API endpoint.
type RevalidationRequestBody struct {
	Secret string `json:"secret"`
	Path   string `json:"path"`
}

func main() {
	logger := logrus.New()
	logger.SetFormatter(&logrus.JSONFormatter{})
	logger.SetOutput(os.Stdout)
	logger.SetLevel(logrus.InfoLevel)

	kafkaBrokers := []string{"localhost:9092"}
	kafkaTopic := "isr_pipeline.product_catalog.products"
	consumerGroup := "isr-dispatcher-group"

	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Version = sarama.V2_8_1_0 // Use a version compatible with your Kafka cluster

	consumer := Consumer{
		logger: logger,
		ready:  make(chan bool),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(kafkaBrokers, consumerGroup, config)
	if err != nil {
		logger.Fatalf("Error creating consumer group client: %v", err)
	}

	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := client.Consume(ctx, []string{kafkaTopic}, &consumer); err != nil {
				if errors.Is(err, sarama.ErrClosedConsumerGroup) {
					return
				}
				logger.Errorf("Error from consumer: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	<-consumer.ready // Wait for the consumer to be set up
	logger.Info("Sarama consumer up and running!...")

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		logger.Info("terminating: context cancelled")
	case <-sigterm:
		logger.Info("terminating: via signal")
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		logger.Fatalf("Error closing client: %v", err)
	}
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	logger *logrus.Logger
	ready  chan bool
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	close(consumer.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		logEntry := consumer.logger.WithFields(logrus.Fields{
			"topic":     message.Topic,
			"partition": message.Partition,
			"offset":    message.Offset,
		})
		logEntry.Infof("Message claimed: value length %d", len(message.Value))

		var msg DebeziumMessage
		if err := json.Unmarshal(message.Value, &msg); err != nil {
			logEntry.Errorf("Failed to unmarshal message: %v", err)
			session.MarkMessage(message, "") // Acknowledge message even on parse failure
			continue
		}
		
		// The pitfall here is handling delete events. A DELETE will produce a message
		// where `after` is null. For simplicity, we only handle create/update here.
		// A production system must handle deletes, perhaps by triggering a revalidation
		// that results in a 404.
		if msg.Payload.After.TenantID == "" || msg.Payload.After.ProductID == "" {
			logEntry.Warn("Skipping message with empty tenant_id or product_id (likely a delete event)")
			session.MarkMessage(message, "")
			continue
		}

		go func(m DebeziumMessage) {
			processMessage(logEntry, m)
		}(msg)

		session.MarkMessage(message, "")
	}
	return nil
}

func processMessage(logger *logrus.Entry, msg DebeziumMessage) {
	tenantID := msg.Payload.After.TenantID
	productID := msg.Payload.After.ProductID

	route, ok := tenantRouting[tenantID]
	if !ok {
		logger.Warnf("No route configured for tenant_id: %s", tenantID)
		return
	}

	// The path construction logic is business-specific.
	pathToRevalidate := "/products/" + productID

	requestBody := RevalidationRequestBody{
		Secret: route.SecretToken,
		Path:   pathToRevalidate,
	}

	jsonData, err := json.Marshal(requestBody)
	if err != nil {
		logger.Errorf("Failed to marshal revalidation request: %v", err)
		return
	}

	req, err := http.NewRequest("POST", route.WebhookURL, bytes.NewBuffer(jsonData))
	if err != nil {
		logger.Errorf("Failed to create HTTP request: %v", err)
		return
	}
	req.Header.Set("Content-Type", "application/json")

	client := &http.Client{Timeout: 10 * time.Second}
	resp, err := client.Do(req)
	if err != nil {
		logger.Errorf("Failed to send revalidation request for path %s: %v", pathToRevalidate, err)
		return
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		logger.Errorf("Revalidation request for path %s failed with status %d", pathToRevalidate, resp.StatusCode)
		return
	}

	logger.Infof("Successfully triggered revalidation for tenant %s, path %s", tenantID, pathToRevalidate)
}

This dispatcher embodies the “polyrepo-aware” logic in the tenantRouting map. In our production environment, this is not a hardcoded map but a query to a service metadata database that stores which tenant’s application is deployed where and what its revalidation credentials are.

Phase 3: Securing the Frontend Revalidation Endpoint

The final piece is the Next.js application itself. Each application in the polyrepo needs a standardized but secure API route to handle incoming revalidation requests from our dispatcher. Exposing an unprotected revalidation endpoint is a major security risk.

Here’s the implementation for a typical product page and its corresponding API route.

// pages/products/[productId].js
// Example of a page component using getStaticProps and getStaticPaths for ISR.

import { getProductById, getAllProductIds } from '../../lib/data-access'; // Dummy data access functions

export async function getStaticProps({ params }) {
  // In a real app, this would fetch from a service that reads from Cassandra.
  const productData = await getProductById(params.productId);

  if (!productData) {
    return {
      notFound: true,
    };
  }

  return {
    props: {
      product: productData,
    },
    // The page will be re-generated on-demand via the webhook,
    // or at most once every 3600 seconds (1 hour) as a fallback.
    revalidate: 3600, 
  };
}

export async function getStaticPaths() {
  // Pre-build a small subset of popular product pages at build time.
  // The rest will be generated on-demand on the first visit.
  const paths = await getAllProductIds(); // Fetches a limited set of initial IDs
  return {
    paths,
    fallback: 'blocking', // 'blocking' ensures the user sees the generated page on first visit.
  };
}

export default function Product({ product }) {
  // Simple component to render product details.
  return (
    <div>
      <h1>{product.name}</h1>
      <p>{product.description}</p>
      <p>Price: ${product.price}</p>
      <p>In Stock: {product.stock_count}</p>
    </div>
  );
}

The critical counterpart is the API route. A common mistake is to perform a simple secret check. Production-grade code must validate the request method, body structure, and the token before calling res.revalidate.

// pages/api/revalidate.js

export default async function handler(req, res) {
  // 1. Security: Only allow POST requests
  if (req.method !== 'POST') {
    return res.status(405).json({ message: 'Method Not Allowed' });
  }

  // 2. Security: Check for the secret token
  // The secret should come from an environment variable specific to this application.
  if (req.body.secret !== process.env.ISR_REVALIDATION_TOKEN) {
    console.warn('Invalid revalidation token received.'); // Log for monitoring
    return res.status(401).json({ message: 'Invalid token' });
  }

  // 3. Payload validation
  if (!req.body.path || typeof req.body.path !== 'string') {
    return res.status(400).json({ message: 'Path is required and must be a string' });
  }

  try {
    // This is the core Next.js function to trigger a regeneration
    await res.revalidate(req.body.path);
    console.log(`Revalidation triggered for path: ${req.body.path}`);
    return res.json({ revalidated: true });
  } catch (err) {
    // A common pitfall: If revalidation fails (e.g., the data source is down during
    // getStaticProps), Next.js will throw an error. The stale page will continue to be served.
    console.error(`Error revalidating path ${req.body.path}:`, err);
    return res.status(500).send('Error revalidating');
  }
}

With these three components in place—the CDC pipeline, the dispatcher, and the secure frontend endpoint—the entire system works. An update to a product in Cassandra now propagates through Kafka to the dispatcher, which securely tells the correct Next.js application to rebuild a single page, all within a few seconds.

sequenceDiagram
    participant C as Cassandra
    participant D as Debezium Connector
    participant K as Kafka
    participant S as ISR Dispatcher
    participant F as Frontend (Next.js App)

    C->>D: CommitLog Change Event (product_id: 'abc-123')
    D->>K: Publishes JSON message to topic
    S->>K: Consumes message from topic
    S-->>S: Parses event, finds product_id 'abc-123'
    S-->>S: Looks up tenant route for 'abc-123'
    S->>F: POST /api/revalidate (path: '/products/abc-123', secret: '...')
    F-->>F: Verifies secret token
    F-->>F: Calls res.revalidate('/products/abc-123')
    F-->>C: getStaticProps fetches fresh data
    F-->>F: Re-renders HTML page
    F->>S: 200 OK { revalidated: true }

One major problem we encountered during stress testing was event storms. A batch script updating 50,000 product prices could generate 50,000 Kafka messages in a minute, leading our dispatcher to bombard our Vercel/Netlify build fleet with revalidation requests. This caused throttling and build queue congestion. The solution was to implement batching and debouncing within the ISR Dispatcher. Instead of calling the webhook immediately, it would collect paths in-memory for a short window (e.g., 500ms) and then send a single request for a batch of paths if the target frontend supported it, or send them sequentially with rate limiting. This significantly smoothed out the load on our build infrastructure.

This architecture, while complex, provides immense value in its scalability and loose coupling. However, it is not without its limitations. The current routing map in the dispatcher service is still relatively static; a more robust solution would involve a service registry so that new frontend applications could register themselves without requiring a dispatcher redeployment. Furthermore, the feedback loop is incomplete. If a Next.js application fails to revalidate a page (e.g., due to a temporary database connection issue in getStaticProps), the dispatcher is unaware of the failure. A potential future iteration could involve a callback mechanism or a dead-letter queue in Kafka to handle and retry these failed revalidation attempts, ensuring eventual consistency even in the face of transient frontend build failures.


  TOC