The project brief was straightforward: ingest a high volume of analytics events from a new mobile application feature. The forecasted traffic pattern, however, was anything but simple. We expected massive, spiky bursts of events during peak user engagement, followed by long lulls. A standard synchronous API endpoint directly connected to our database was immediately ruled out; it would crumble under the first wave of traffic.
Decoupling was the obvious answer. The architecture settled on a fire-and-forget endpoint for the mobile client, which would push events into an AWS SQS queue. A fleet of backend consumers, running in Docker containers, would then process these events asynchronously. This is a textbook pattern, but the real challenge lies in the consumer’s implementation. Our initial load tests with a naive consumer design were a disaster. Containers would either OOM-kill under load or, even worse, fail to process messages before their SQS visibility timeout expired, leading to duplicate processing and a storm of poison pill messages in the Dead-Letter Queue (DLQ).
The core pain point was a lack of backpressure. The consumer was greedily pulling messages from SQS as fast as possible, overwhelming its own processing capacity. The solution required building a consumer that could regulate itself—an adaptive system that could apply internal backpressure to match its SQS consumption rate with its actual processing throughput.
Our initial concept was a simple Go application. Go’s concurrency model with goroutines and channels seemed perfect for building a high-throughput, parallelized worker pool. The first iteration, however, demonstrated a common pitfall in concurrent design.
Here’s the V1 code, a single-threaded loop. It worked, but was unacceptably slow, processing messages one by one.
// V1: Naive single-threaded consumer. Functional but slow.
func (c *Consumer) processSingleThreaded() {
for {
select {
case <-c.ctx.Done():
c.logger.Info("Shutting down single-threaded processor")
return
default:
output, err := c.sqsClient.ReceiveMessage(c.ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.queueURL),
MaxNumberOfMessages: 1, // Process one at a time
WaitTimeSeconds: 20, // Long polling
})
if err != nil {
c.logger.Error("Failed to receive message", "error", err)
time.Sleep(1 * time.Second) // Avoid spamming logs on persistent errors
continue
}
for _, message := range output.Messages {
if err := c.processMessage(c.ctx, message); err != nil {
c.logger.Error("Failed to process message", "messageId", *message.MessageId, "error", err)
// Error handling is omitted for brevity; in reality, we wouldn't delete.
}
_, err := c.sqsClient.DeleteMessage(c.ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(c.queueURL),
ReceiptHandle: message.ReceiptHandle,
})
if err != nil {
c.logger.Error("Failed to delete message", "messageId", *message.MessageId, "error", err)
}
}
}
}
}
This was never going to meet production requirements. The next logical step was to introduce concurrency via a worker pool. We created a dispatcher goroutine to fetch messages in batches from SQS and a pool of worker goroutines to process them in parallel. Communication between the dispatcher and workers was handled by a Go channel.
This is the V2 architecture:
graph TD subgraph SQS direction LR Q[Queue] end subgraph Docker Container direction LR D[Dispatcher] -- Batched Messages via Channel --> WP[Worker Pool] WP -- Processes in Parallel --> P((Process)) end SQS -- ReceiveMessage --> D P -- DeleteMessage --> SQS
The code for V2 looked something like this.
// V2: Concurrent worker pool. Better, but fatally flawed under load.
// Represents a job for a worker
type Job struct {
Message types.Message
}
// Worker function
func (c *Consumer) worker(id int, jobs <-chan Job) {
c.logger.Info("Starting worker", "id", id)
for {
select {
case <-c.ctx.Done():
c.logger.Info("Worker shutting down", "id", id)
return
case job := <-jobs:
c.logger.Debug("Worker received job", "id", id, "messageId", *job.Message.MessageId)
if err := c.processMessage(c.ctx, job.Message); err != nil {
c.logger.Error("Worker failed to process message", "id", id, "messageId", *job.Message.MessageId, "error", err)
// Production code would need logic to not delete the message.
// For example, by not proceeding to the delete step.
continue // Skip deletion on processing error
}
// On successful processing, delete from SQS
_, err := c.sqsClient.DeleteMessage(c.ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(c.queueURL),
ReceiptHandle: job.Message.ReceiptHandle,
})
if err != nil {
c.logger.Error("Worker failed to delete message", "id", id, "messageId", *job.Message.MessageId, "error", err)
}
}
}
}
// Dispatcher function
func (c *Consumer) dispatch(jobs chan<- Job) {
for {
select {
case <-c.ctx.Done():
c.logger.Info("Dispatcher shutting down")
return
default:
output, err := c.sqsClient.ReceiveMessage(c.ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.queueURL),
MaxNumberOfMessages: 10, // Max batch size
WaitTimeSeconds: 20,
})
if err != nil {
c.logger.Error("Dispatcher failed to receive messages", "error", err)
time.Sleep(1 * time.Second)
continue
}
if len(output.Messages) > 0 {
c.logger.Info("Dispatcher received messages", "count", len(output.Messages))
for _, msg := range output.Messages {
jobs <- Job{Message: msg} // This is the blocking point of failure
}
}
}
}
}
// Main startup logic
func (c *Consumer) StartV2(numWorkers int) {
jobs := make(chan Job) // Unbuffered channel
for i := 1; i <= numWorkers; i++ {
go c.worker(i, jobs)
}
go c.dispatch(jobs)
<-c.ctx.Done()
close(jobs)
c.logger.Info("Consumer V2 shutting down.")
}
The load test against this V2 implementation revealed the critical design flaw. When a burst of traffic hit, the dispatcher would fetch 10 messages and attempt to push them onto the jobs
channel. If the workers were all busy with long-running tasks, the dispatcher would block on jobs <- Job{Message: msg}
. While blocked, the visibility timeout of all 10 messages fetched from SQS was ticking down. If the workers couldn’t clear the backlog fast enough, the timeouts would expire, and SQS would make the messages visible again. Another consumer instance (or even the same one in a later fetch) would pick them up, leading to duplicate processing.
Worse, if we changed jobs
to a large buffered channel to prevent the dispatcher from blocking, we just shifted the problem. The dispatcher would fill the buffer, holding potentially thousands of messages in the container’s memory. A sustained traffic spike would exhaust the container’s memory, causing it to crash. We had created a system that was both unreliable and unstable.
The solution was to introduce a feedback loop. The dispatcher should not be allowed to fetch new messages from SQS unless there is capacity in the worker pool to process them. This is the essence of backpressure. By using a buffered channel with a size that reflects the system’s capacity, we can make the dispatcher’s fetch decision contingent on the channel’s state.
This led to V3, the backpressure-aware consumer. The core change is in the dispatch
loop. We no longer blindly fetch. We fetch only when we know we have room.
// V3: Backpressure-Aware Consumer Implementation
// The full, runnable final version of our consumer.
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
// Config holds the configuration for the consumer.
type Config struct {
QueueURL string
AWSRegion string
NumWorkers int
MaxJobs int // Size of the buffered channel, our backpressure mechanism
PollWaitTime int32
}
// LoadConfig loads configuration from environment variables.
func LoadConfig() (*Config, error) {
numWorkers, err := strconv.Atoi(getEnv("NUM_WORKERS", "10"))
if err != nil {
return nil, err
}
maxJobs, err := strconv.Atoi(getEnv("MAX_JOBS", "20"))
if err != nil {
return nil, err
}
pollWaitTime, err := strconv.Atoi(getEnv("POLL_WAIT_TIME_SECONDS", "20"))
if err != nil {
return nil, err
}
return &Config{
QueueURL: getEnv("SQS_QUEUE_URL", ""),
AWSRegion: getEnv("AWS_REGION", "us-east-1"),
NumWorkers: numWorkers,
MaxJobs: maxJobs,
PollWaitTime: int32(pollWaitTime),
}, nil
}
func getEnv(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}
// SQSClient defines the interface we need from the AWS SQS client.
// This is crucial for unit testing.
type SQSClient interface {
ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}
// Consumer is the SQS message processor.
type Consumer struct {
config *Config
sqsClient SQSClient
logger *slog.Logger
jobs chan types.Message
wg sync.WaitGroup
}
// NewConsumer creates a new consumer instance.
func NewConsumer(cfg *Config, client SQSClient, logger *slog.Logger) *Consumer {
return &Consumer{
config: cfg,
sqsClient: client,
logger: logger,
// The buffered channel is the key. Its size determines how many messages
// can be "in-flight" within the container before we stop polling SQS.
jobs: make(chan types.Message, cfg.MaxJobs),
}
}
// Start kicks off the consumer's dispatcher and workers.
func (c *Consumer) Start(ctx context.Context) {
c.logger.Info("Starting consumer", "workers", c.config.NumWorkers, "job_buffer", c.config.MaxJobs)
// Start workers
for i := 1; i <= c.config.NumWorkers; i++ {
c.wg.Add(1)
go c.worker(ctx, i)
}
// Start dispatcher
c.wg.Add(1)
go c.dispatch(ctx)
c.logger.Info("Consumer started")
}
// Wait blocks until all consumer goroutines have shut down.
func (c *Consumer) Wait() {
c.wg.Wait()
c.logger.Info("Consumer has shut down gracefully")
}
// worker processes messages from the jobs channel.
func (c *Consumer) worker(ctx context.Context, id int) {
defer c.wg.Done()
c.logger.Info("Worker started", "id", id)
for {
select {
case <-ctx.Done():
c.logger.Info("Worker shutting down", "id", id)
return
case msg := <-c.jobs:
log := c.logger.With("worker_id", id, "message_id", *msg.MessageId)
log.Info("Worker processing message")
// Simulate processing work.
// In a real application, this would be calling a database, another service, etc.
time.Sleep(100 * time.Millisecond)
if err := c.deleteMessage(ctx, msg.ReceiptHandle); err != nil {
log.Error("Failed to delete message", "error", err)
} else {
log.Info("Successfully processed and deleted message")
}
}
}
}
// dispatch fetches messages from SQS and pushes them to the jobs channel.
// This is the heart of the adaptive logic.
func (c *Consumer) dispatch(ctx context.Context) {
defer c.wg.Done()
defer close(c.jobs)
c.logger.Info("Dispatcher started")
// Ticker to prevent a tight loop when the job channel is full.
// This prevents high CPU usage when the consumer is at capacity.
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
c.logger.Info("Dispatcher shutting down")
return
case <-ticker.C:
// The core of the backpressure mechanism:
// Calculate how many messages we can fetch based on the free space in the jobs channel.
// The channel's capacity acts as a semaphore.
capacity := c.config.MaxJobs - len(c.jobs)
if capacity <= 0 {
c.logger.Debug("Job channel is full, skipping SQS poll")
continue // Skip polling if we have no room for new jobs
}
// SQS max batch size is 10. We fetch up to our available capacity or 10.
fetchBatchSize := int32(min(capacity, 10))
c.logger.Debug("Polling SQS", "capacity", capacity, "fetch_batch_size", fetchBatchSize)
output, err := c.sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.config.QueueURL),
MaxNumberOfMessages: fetchBatchSize,
WaitTimeSeconds: c.config.PollWaitTime,
})
if err != nil {
c.logger.Error("Failed to receive messages from SQS", "error", err)
continue
}
if len(output.Messages) > 0 {
c.logger.Info("Received messages from SQS", "count", len(output.Messages))
for _, msg := range output.Messages {
// We must check the context again here in case a shutdown was initiated
// while we were waiting on SQS.
select {
case c.jobs <- msg:
// Message successfully queued for a worker
case <-ctx.Done():
c.logger.Info("Shutdown signal received, not queueing more jobs")
// In a real scenario, we might want to return these messages to SQS
// by not deleting them and letting the visibility timeout expire.
return
}
}
}
}
}
}
func (c *Consumer) deleteMessage(ctx context.Context, receiptHandle *string) error {
_, err := c.sqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(c.config.QueueURL),
ReceiptHandle: receiptHandle,
})
return err
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
cfg, err := LoadConfig()
if err != nil {
logger.Error("Failed to load configuration", "error", err)
os.Exit(1)
}
if cfg.QueueURL == "" {
logger.Error("SQS_QUEUE_URL environment variable must be set")
os.Exit(1)
}
awsCfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(cfg.AWSRegion))
if err != nil {
logger.Error("Failed to load AWS configuration", "error", err)
os.Exit(1)
}
sqsClient := sqs.NewFromConfig(awsCfg)
// Set up a context that listens for the interrupt signal.
ctx, cancel := context.WithCancel(context.Background())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
consumer := NewConsumer(cfg, sqsClient, logger)
consumer.Start(ctx)
// Wait for a termination signal
<-sigChan
logger.Info("Shutdown signal received, initiating graceful shutdown...")
cancel() // Trigger context cancellation
// Wait for all goroutines to finish
consumer.Wait()
}
The system now self-regulates. The len(c.jobs)
provides a real-time measure of the internal queue’s pressure. The dispatcher adapts its SQS fetch size based on this pressure. If workers are fast, the channel drains quickly, capacity increases, and the dispatcher fetches larger batches. If workers slow down, the channel fills up, capacity drops, and the dispatcher fetches smaller batches, or even stops fetching altogether until the workers catch up. This prevents the container from being flooded with messages it cannot handle, effectively stopping the visibility timeout and OOM issues.
Finally, the consumer needed to be containerized for deployment. A multi-stage Dockerfile is essential for production-grade images. It keeps the final image small and secure by not including the Go toolchain and build artifacts.
# ---- Build Stage ----
FROM golang:1.21-alpine AS builder
WORKDIR /app
# Copy Go modules and download dependencies
COPY go.mod go.sum ./
RUN go mod download
# Copy the source code
COPY . .
# Build the application
# CGO_ENABLED=0 is important for creating a static binary
# -ldflags="-s -w" strips debugging information to reduce binary size
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /consumer ./
# ---- Final Stage ----
FROM alpine:latest
# It's good practice to run as a non-root user
RUN addgroup -S appgroup && adduser -S appuser -G appgroup
USER appuser
WORKDIR /home/appuser
# Copy only the compiled binary from the builder stage
COPY /consumer /consumer
# Set environment variables (these will be overridden by the runtime environment, e.g., ECS Task Definition)
ENV SQS_QUEUE_URL=""
ENV AWS_REGION="us-east-1"
ENV NUM_WORKERS="10"
ENV MAX_JOBS="20"
ENV POLL_WAIT_TIME_SECONDS="20"
# Command to run the application
ENTRYPOINT ["/consumer"]
This architecture is robust, but it’s not a silver bullet. It solves the problem of a single consumer instance being overwhelmed. True horizontal scalability requires running multiple instances of this container on a platform like AWS ECS or Fargate, with autoscaling policies tied to SQS metrics like ApproximateNumberOfMessagesVisible
. Our self-throttling container ensures each instance is a well-behaved, stable unit, while the orchestrator handles scaling the number of units based on overall queue depth.
The current adaptive logic is also purely based on the job channel’s buffer size. A future iteration could incorporate more sophisticated metrics. For instance, a consumer could monitor its own CPU and memory utilization and dynamically adjust its NumWorkers
or throttle its polling rate if it approaches resource limits. This would create an even more resilient system, capable of adapting not just to workload pressure but also to its own runtime environment constraints.