Our asynchronous ingestion pipeline appeared robust on the surface. A fleet of RESTful API services, deployed on Nomad, accepted high-volume traffic and published jobs to a Kafka cluster. A separate group of consumer services, also managed by Nomad, processed these jobs. The decoupling provided by Kafka was supposed to insulate the front-end API from back-end processing delays. This illusion of resilience shattered during a partial downstream outage.
The consumers, unable to reach a critical database, began failing to process messages. They didn’t crash; they simply entered a tight loop of consuming a message, attempting to process it, failing, and not committing the offset. Kafka client libraries are tenacious and kept retrying. From Nomad’s perspective, the services were “healthy” as the processes were still running. However, message lag in the relevant Kafka topic began to climb exponentially. Soon, Kafka’s own retention policies became a problem, and broker disk I/O saturated. The latency for producers—our API services—skyrocketed from milliseconds to several seconds. API threads blocked on kafka.send()
, connection pools exhausted, and what began as a downstream processing issue cascaded into a full-blown front-end outage. The API started returning 504 Gateway Timeout
.
Static rate limiting was insufficient. The problem wasn’t the raw request volume; it was the back-end’s inability to handle a normal load. We needed a dynamic control mechanism that would make the producers aware of the consumers’ health. The solution was a circuit breaker, but not a simple, in-memory one. We needed to build a distributed circuit breaker where the state of the consumer fleet could trip a breaker in the producer fleet, effectively creating a system-wide, self-regulating backpressure mechanism.
The chosen architecture leverages the HashiCorp ecosystem, which integrates naturally with Nomad. We use Consul’s Key-Value (KV) store as the shared state machine for our distributed circuit breaker. The consumers are responsible for writing their health metrics to Consul, and the producers read this aggregate state to decide whether to open or close the circuit.
sequenceDiagram participant Client participant API Service (Producer) as P participant CircuitBreaker as CB participant ConsulKV as KV participant Kafka participant ConsumerService as C participant DownstreamDB as DB loop Normal Operation Client->>P: POST /api/v1/task P->>CB: isAllowed()? CB->>KV: Read Breaker State (CLOSED) KV-->>CB: State: CLOSED CB-->>P: true P->>Kafka: Produce Message Kafka-->>P: Ack P-->>Client: 202 Accepted C->>Kafka: Consume Message C->>DB: Process Task DB-->>C: Success C->>Kafka: Commit Offset C->>KV: PUT health_metrics/consumer_1 (rate:100/s, fails:0) end Note over DB: Downstream Database becomes unresponsive loop Failure Condition C->>Kafka: Consume Message C->>DB: Process Task DB-->>C: Connection Timeout C->>KV: PUT health_metrics/consumer_1 (rate:0/s, fails:10) end Note over P: Breaker state evaluation logic runs periodically P->>CB: evaluateState() CB->>KV: GET health_metrics/* KV-->>CB: {consumer_1: {fails:10}, consumer_2: {fails:12}} Note over CB: Failure threshold (e.g., 20) exceeded CB->>KV: PUT breaker/state OPEN KV-->>CB: OK loop Circuit is OPEN Client->>P: POST /api/v1/task P->>CB: isAllowed()? CB->>KV: Read Breaker State (OPEN) KV-->>CB: State: OPEN CB-->>P: false P-->>Client: 503 Service Unavailable end
This approach creates a feedback loop that spans multiple services, managed by Nomad, using Consul as the nervous system.
The Health-Reporting Consumer Service
The first step is to modify the consumer to be an active participant in its own health monitoring. It can’t just process messages; it must report its status. We’ll use Go for its performance and robust concurrency model.
The core dependencies are Shopify/sarama
for Kafka and the official hashicorp/consul/api
client.
Here is the configuration structure for the consumer:
// internal/config/consumer.go
package config
import (
"time"
"os"
"gopkg.in/yaml.v3"
)
type Config struct {
ServiceName string `yaml:"service_name"`
Kafka KafkaConfig `yaml:"kafka"`
Consul ConsulConfig `yaml:"consul"`
Metrics MetricsConfig `yaml:"metrics"`
}
type KafkaConfig struct {
Brokers []string `yaml:"brokers"`
Topic string `yaml:"topic"`
GroupID string `yaml:"group_id"`
}
type ConsulConfig struct {
Address string `yaml:"address"`
}
type MetricsConfig struct {
ReportInterval time.Duration `yaml:"report_interval"`
ConsulKeyPrefix string `yaml:"consul_key_prefix"`
}
func LoadConsumerConfig(path string) (*Config, error) {
var cfg Config
f, err := os.ReadFile(path)
if err != nil {
return nil, err
}
if err := yaml.Unmarshal(f, &cfg); err != nil {
return nil, err
}
return &cfg, nil
}
The consumer’s main logic involves setting up a Sarama consumer group handler and a separate goroutine for periodically reporting metrics to Consul.
// cmd/consumer/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
"github.com/google/uuid"
"github.com/hashicorp/consul/api"
"your_project/internal/config"
)
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
metrics *ConsumerMetrics
consulKV *api.KV
metricsKey string
}
// ConsumerMetrics holds thread-safe counters for processing stats.
type ConsumerMetrics struct {
mu sync.Mutex
processed int64
failed int64
lastReport time.Time
}
func (m *ConsumerMetrics) incrementSuccess() {
m.mu.Lock()
defer m.mu.Unlock()
m.processed++
}
func (m *ConsumerMetrics) incrementFailure() {
m.mu.Lock()
defer m.mu.Unlock()
m.failed++
}
// getAndReset returns current metrics and resets them for the next interval.
func (m *ConsumerMetrics) getAndReset() (int64, int64, float64) {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
duration := now.Sub(m.lastReport).Seconds()
if duration < 1 {
duration = 1 // Avoid division by zero
}
proc := m.processed
fail := m.failed
rate := float64(proc) / duration
m.processed = 0
m.failed = 0
m.lastReport = now
return proc, fail, rate
}
func main() {
// ... (Load config from a file)
cfg, err := config.LoadConsumerConfig("consumer.yaml")
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
consulClient, err := api.NewClient(&api.Config{Address: cfg.Consul.Address})
if err != nil {
log.Fatalf("Failed to create Consul client: %v", err)
}
kv := consulClient.KV()
// Unique ID for this consumer instance
instanceID := fmt.Sprintf("%s-%s", cfg.ServiceName, uuid.New().String())
metricsKey := fmt.Sprintf("%s/%s", cfg.Metrics.ConsulKeyPrefix, instanceID)
metrics := &ConsumerMetrics{lastReport: time.Now()}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start the metrics reporter goroutine
go reportMetrics(ctx, kv, metricsKey, metrics, cfg.Metrics.ReportInterval)
consumerGroup, err := sarama.NewConsumerGroup(cfg.Kafka.Brokers, cfg.Kafka.GroupID, nil)
if err != nil {
log.Fatalf("Error creating consumer group client: %v", err)
}
consumerHandler := &Consumer{
ready: make(chan bool),
metrics: metrics,
consulKV: kv,
metricsKey: metricsKey,
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := consumerGroup.Consume(ctx, []string{cfg.Kafka.Topic}, consumerHandler); err != nil {
log.Printf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumerHandler.ready = make(chan bool)
}
}()
<-consumerHandler.ready // Await till the consumer has joined the group
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
cancel()
wg.Wait()
if err = consumerGroup.Close(); err != nil {
log.Printf("Error closing client: %v", err)
}
// Clean up our key from Consul on graceful shutdown
log.Printf("Deregistering from Consul KV: %s", metricsKey)
_, _ = kv.Delete(metricsKey, nil)
}
// reportMetrics periodically pushes the current metrics to Consul KV.
func reportMetrics(ctx context.Context, kv *api.KV, key string, metrics *ConsumerMetrics, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_, failed, rate := metrics.getAndReset()
payload := map[string]interface{}{
"timestamp": time.Now().UTC().Format(time.RFC3339),
"rate_ok": rate,
"count_fail": failed,
}
jsonData, err := json.Marshal(payload)
if err != nil {
log.Printf("Error marshalling metrics: %v", err)
continue
}
// A pitfall here is network failures to Consul. A robust implementation
// would include retries with backoff. For this example, we keep it simple.
p := &api.KVPair{Key: key, Value: jsonData}
if _, err := kv.Put(p, nil); err != nil {
log.Printf("Error writing metrics to Consul: %v", err)
}
}
}
}
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
// Simulate processing and potential failure
err := processMessage(message.Value)
if err != nil {
log.Printf("Failed to process message: %v", err)
consumer.metrics.incrementFailure()
// We do NOT mark the message as consumed, so it will be re-delivered.
// This is the behavior that causes the original problem.
// A more advanced strategy might involve a Dead Letter Queue.
} else {
consumer.metrics.incrementSuccess()
session.MarkMessage(message, "")
}
}
return nil
}
// processMessage simulates a task that can fail.
func processMessage(value []byte) error {
// In a real-world project, this function would interact with databases,
// other APIs, etc. We simulate a controllable failure source.
// Check an environment variable to decide if processing should fail.
if os.Getenv("INJECT_FAILURE") == "true" {
time.Sleep(100 * time.Millisecond) // Simulate work
return fmt.Errorf("downstream service unavailable")
}
time.Sleep(20 * time.Millisecond) // Simulate successful work
return nil
}
Deploying the Consumer with Nomad
The Nomad job file defines how to run, monitor, and scale our consumer fleet. We’ll use the exec
driver to run the compiled Go binary.
// nomad/consumer.nomad
job "kafka-consumer" {
datacenters = ["dc1"]
type = "service"
group "consumers" {
count = 3 // Run 3 instances of the consumer for parallelism
restart {
attempts = 3
interval = "5m"
delay = "15s"
mode = "fail"
}
task "app" {
driver = "exec"
config {
command = "/local/bin/consumer"
args = ["-config", "/local/consumer.yaml"]
}
// The binary and config are assumed to be on the Nomad client node.
// In a production setup, use the 'artifact' stanza to download them.
artifact {
source = "http://my-artifactory/consumer"
destination = "local/bin/"
mode = "file"
options {
checksum = "sha256:..."
}
}
template {
data = <<EOF
service_name: "kafka-consumer"
kafka:
brokers: ["10.0.0.1:9092", "10.0.0.2:9092"]
topic: "tasks-topic"
group_id: "tasks-processor-group"
consul:
address: "127.0.0.1:8500"
metrics:
report_interval: "5s"
consul_key_prefix: "service-metrics/kafka-consumer"
EOF
destination = "local/consumer.yaml"
change_mode = "signal"
change_signal = "SIGHUP"
}
// Pass the failure injection flag as an environment variable
// This allows us to toggle failure mode without a redeploy.
env {
INJECT_FAILURE = "false"
}
resources {
cpu = 200 # MHz
memory = 128 # MB
}
}
}
}
By running nomad job run nomad/consumer.nomad
, Nomad will schedule three instances of this consumer. Each will register a unique key in Consul under service-metrics/kafka-consumer/
and begin reporting its status every 5 seconds. We can toggle the INJECT_FAILURE
environment variable on one or all of the allocations to simulate a partial or full outage.
The Circuit-Breaking API Service
Now for the producer side. The API service needs to implement the circuit breaker logic that reads from Consul.
The configuration is similar to the consumer’s:
// internal/config/api.go
package config
// ... similar structure for loading API config ...
type APIConfig struct {
Server ServerConfig `yaml:"server"`
Kafka KafkaProducerConfig `yaml:"kafka"`
CircuitBreaker CBreakerConfig `yaml:"circuit_breaker"`
}
// ...
type CBreakerConfig struct {
ConsulAddress string `yaml:"consul_address"`
ConsulKeyPrefix string `yaml:"consul_key_prefix"`
EvaluationInterval time.Duration `yaml:"evaluation_interval"`
FailureThreshold int64 `yaml:"failure_threshold"` // Total failures across all consumers
OpenDuration time.Duration `yaml:"open_duration"`
}
The core of the API is the distributed circuit breaker implementation. It will have three states: Closed
, Open
, and HalfOpen
.
// internal/breaker/distributed_breaker.go
package breaker
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"github.com/hashicorp/consul/api"
"your_project/internal/config"
)
type State string
const (
StateClosed State = "CLOSED"
StateOpen State = "OPEN"
StateHalfOpen State = "HALF_OPEN"
)
// BreakerState represents the shared state in Consul
type BreakerState struct {
CurrentState State `json:"current_state"`
LastChanged time.Time `json:"last_changed"`
}
type DistributedBreaker struct {
config config.CBreakerConfig
consulKV *api.KV
mu sync.RWMutex
localState BreakerState
}
func NewDistributedBreaker(cfg config.CBreakerConfig) (*DistributedBreaker, error) {
consulClient, err := api.NewClient(&api.Config{Address: cfg.ConsulAddress})
if err != nil {
return nil, err
}
breaker := &DistributedBreaker{
config: cfg,
consulKV: consulClient.KV(),
localState: BreakerState{CurrentState: StateClosed}, // Default to closed
}
return breaker, nil
}
func (db *DistributedBreaker) Start(ctx context.Context) {
// Start a background goroutine to periodically evaluate consumer health
// and update the breaker state.
go db.evaluatorLoop(ctx)
// Another goroutine to sync local state with Consul state
go db.stateSyncer(ctx)
}
func (db *DistributedBreaker) IsAllowed() bool {
db.mu.RLock()
defer db.mu.RUnlock()
// The core logic: reject requests if the breaker is open.
// Half-Open state is treated as closed for this simple check,
// but the evaluator logic will handle its transition.
return db.localState.CurrentState != StateOpen
}
func (db *DistributedBreaker) stateSyncer(ctx context.Context) {
ticker := time.NewTicker(db.config.EvaluationInterval / 2) // Sync faster than we evaluate
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
kvPair, _, err := db.consulKV.Get("breaker/state", nil)
if err != nil {
log.Printf("[BREAKER] Error syncing state from Consul: %v", err)
continue
}
if kvPair == nil || len(kvPair.Value) == 0 {
// No state exists, assume closed. We'll create it on next evaluation.
continue
}
var remoteState BreakerState
if err := json.Unmarshal(kvPair.Value, &remoteState); err != nil {
log.Printf("[BREAKER] Error unmarshalling remote state: %v", err)
continue
}
db.mu.Lock()
if db.localState.CurrentState != remoteState.CurrentState {
log.Printf("[BREAKER] State changed from %s to %s via Consul", db.localState.CurrentState, remoteState.CurrentState)
db.localState = remoteState
}
db.mu.Unlock()
}
}
}
func (db *DistributedBreaker) evaluatorLoop(ctx context.Context) {
ticker := time.NewTicker(db.config.EvaluationInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
db.mu.Lock()
currentState := db.localState.CurrentState
lastChanged := db.localState.LastChanged
db.mu.Unlock()
// State machine logic
switch currentState {
case StateClosed:
db.evaluateHealthAndTrip()
case StateOpen:
if time.Since(lastChanged) > db.config.OpenDuration {
log.Println("[BREAKER] Open duration expired. Transitioning to HALF_OPEN.")
db.setState(StateHalfOpen)
}
case StateHalfOpen:
// In a real system, you'd allow one "canary" request through and see if it succeeds.
// For simplicity here, we will just transition back to CLOSED after a short period,
// assuming the system has had time to recover. A failed check would transition
// it back to OPEN immediately.
log.Println("[BREAKER] In HALF_OPEN. Assuming recovery and transitioning to CLOSED.")
db.setState(StateClosed)
}
}
}
}
func (db *DistributedBreaker) evaluateHealthAndTrip() {
// A common mistake is to only check for failures. A healthy system has a high
// processing rate. A system with zero failures but also zero throughput is broken.
pairs, _, err := db.consulKV.List(db.config.ConsulKeyPrefix, nil)
if err != nil {
log.Printf("[BREAKER] Failed to list consumer metrics from Consul: %v", err)
return
}
if len(pairs) == 0 {
log.Printf("[BREAKER] No consumer metrics found in Consul. Assuming healthy.")
return
}
var totalFailures int64
for _, pair := range pairs {
var metrics map[string]interface{}
if err := json.Unmarshal(pair.Value, &metrics); err == nil {
if failCount, ok := metrics["count_fail"].(float64); ok {
totalFailures += int64(failCount)
}
}
}
log.Printf("[BREAKER] Health evaluation: Total failures in last interval: %d", totalFailures)
if totalFailures >= db.config.FailureThreshold {
log.Printf("[BREAKER] Failure threshold %d exceeded (%d). Tripping breaker to OPEN.", db.config.FailureThreshold, totalFailures)
db.setState(StateOpen)
}
}
// setState updates the state in Consul. This is the only place we write.
// A more robust implementation would use Consul's Check-And-Set (CAS) feature
// to avoid race conditions between multiple API instances.
func (db *DistributedBreaker) setState(newState State) {
state := BreakerState{
CurrentState: newState,
LastChanged: time.Now(),
}
jsonData, _ := json.Marshal(state)
p := &api.KVPair{Key: "breaker/state", Value: jsonData}
if _, err := db.consulKV.Put(p, nil); err != nil {
log.Printf("[BREAKER] CRITICAL: Failed to update breaker state in Consul: %v", err)
// If we can't write to Consul, the system is in an unknown state.
// A safe default is to fail open (i.e., keep the local breaker open).
db.mu.Lock()
db.localState.CurrentState = StateOpen
db.mu.Unlock()
} else {
db.mu.Lock()
db.localState = state
db.mu.Unlock()
}
}
This breaker is then integrated into the API’s HTTP handler as a middleware.
// cmd/api/main.go
package main
import (
"context"
"log"
"net/http"
"github.com/gin-gonic/gin"
// ... other imports for kafka producer, config, breaker
)
func main() {
// ... load config ...
// Initialize the distributed circuit breaker
dBreaker, err := breaker.NewDistributedBreaker(cfg.CircuitBreaker)
if err != nil {
log.Fatalf("Failed to create circuit breaker: %v", err)
}
// Start its background evaluation loops
dBreaker.Start(context.Background())
// Initialize Kafka producer
// ... producer setup ...
router := gin.Default()
// Apply the circuit breaker middleware
router.Use(CircuitBreakerMiddleware(dBreaker))
router.POST("/api/v1/task", func(c *gin.Context) {
// If we reach here, the breaker allowed the request
// ... read request body, produce to Kafka ...
c.JSON(http.StatusAccepted, gin.H{"status": "task accepted"})
})
router.Run(cfg.Server.Address)
}
func CircuitBreakerMiddleware(b *breaker.DistributedBreaker) gin.HandlerFunc {
return func(c *gin.Context) {
if !b.IsAllowed() {
log.Println("Request rejected by open circuit breaker.")
// Let the client know when to retry
c.Header("Retry-After", "30")
c.AbortWithStatusJSON(http.StatusServiceUnavailable, gin.H{
"error": "Service is currently unavailable due to high load or failure. Please try again later.",
})
return
}
c.Next()
}
}
The Nomad job for the API service is straightforward, similar to the consumer’s but defining an HTTP service and health check.
The result of this architecture is a system that heals itself. When the consumers start failing, they report their poor health. The API services independently observe this trend in Consul and collectively decide to open the circuit, protecting both the Kafka cluster and their own resources. They begin rejecting traffic with a 503 Service Unavailable
, signaling to clients that the system is temporarily overloaded. Once the downstream issue is resolved and consumers resume processing messages successfully, their reported failure counts drop to zero. The API services see this recovery, transition the breaker to HALF_OPEN
, and then back to CLOSED
, automatically resuming normal operation without manual intervention.
The boundaries of this solution are defined by its dependencies. The reliability of Consul is now paramount. A Consul outage would prevent the breaker state from being updated, potentially locking it in an open or closed state. Therefore, a production deployment requires a highly available Consul cluster. Furthermore, the failure thresholds and evaluation intervals are static. A more sophisticated implementation could employ adaptive algorithms that adjust these parameters based on historical throughput and latency, making the system even more resilient to varying load patterns. Finally, the current implementation uses a single, global breaker. For systems with multiple Kafka topics and consumer groups, a more granular approach with per-topic or per-service breakers would provide finer-grained fault isolation.