The CAP theorem isn’t a choice of two out of three; for any practical distributed system, network partitions are a given. The real engineering work lies in navigating the trade-offs between Consistency and Availability during these partitions. Google Cloud Pub/Sub, a globally distributed messaging system, is designed with a strong bias towards Availability (AP). It guarantees at-least-once message delivery, ensuring that subscribers can almost always receive messages as long as they can reach a Google front-end. This is excellent for system decoupling and resilience, but it offloads the consistency problem to the consumer. A common mistake is to treat “at-least-once” as sufficient, leading to subtle bugs in production when duplicate messages trigger non-idempotent operations, causing data corruption or erroneous financial transactions.
The challenge is to build a consumer that enforces “effectively-once” processing on top of an AP system. This requires shifting the balance from high availability towards stronger consistency at the application layer. We will implement this by building a Go Kit-based microservice that consumes from Pub/Sub, using an external state store (like Redis) to manage idempotency. This architecture makes the CAP trade-off explicit: our service’s ability to process new messages becomes dependent on the availability of our consistency-enforcing layer.
graph TD subgraph "Google Cloud" Publisher --"Message (ID: M1)"--> PubSubTopic PubSubTopic --"At-least-once delivery"--> PubSubSubscription end subgraph "Consumer Service (Go Kit)" PubSubSubscription --"pull(M1)"--> SubscriberTransport SubscriberTransport --"decode()"--> IdempotencyMiddleware IdempotencyMiddleware --"1. check(M1)"--> Redis Redis --"2. M1 not found"--> IdempotencyMiddleware IdempotencyMiddleware --"3. invoke()"--> BusinessEndpoint BusinessEndpoint --"4. process()"--> BusinessDatabase BusinessDatabase --"5. success"--> BusinessEndpoint BusinessEndpoint --"6. success"--> IdempotencyMiddleware IdempotencyMiddleware --"7. set(M1)"--> Redis Redis --"8. OK"--> IdempotencyMiddleware IdempotencyMiddleware --"9. success"--> SubscriberTransport SubscriberTransport --"10. ack(M1)"--> PubSubSubscription end subgraph "Duplicate Delivery Scenario" PubSubSubscription --"pull(M1) again"--> SubscriberTransport_Dup SubscriberTransport_Dup --> IdempotencyMiddleware_Dup IdempotencyMiddleware_Dup --"1. check(M1)"--> Redis Redis --"2. M1 found"--> IdempotencyMiddleware_Dup IdempotencyMiddleware_Dup --"3. skip logic, return success"--> SubscriberTransport_Dup SubscriberTransport_Dup --"4. ack(M1)"--> PubSubSubscription end style Redis fill:#f9f,stroke:#333,stroke-width:2px style BusinessDatabase fill:#ccf,stroke:#333,stroke-width:2px
The fundamental problem this architecture solves is preventing side effects from duplicate message processing. Our design introduces a deliberate coupling to a stateful component (Redis) to act as a source of truth for message processing history. If Redis is unavailable, our consumer must stop processing messages to avoid violating our consistency guarantee. This is the trade-off in action: we sacrifice the availability of our message processing pipeline for the consistency of our business state.
Let’s begin by defining the core service components in Go. We’ll model a simple payments
service that processes payment events.
pkg/payments/service.go
package payments
import (
"context"
"errors"
"time"
)
// ErrInvalidPayment is returned for invalid payment data.
var ErrInvalidPayment = errors.New("invalid payment data")
// Payment represents a payment transaction event.
type Payment struct {
ID string `json:"id"`
Amount float64 `json:"amount"`
Currency string `json:"currency"`
SourceAccount string `json:"source_account"`
TargetAccount string `json:"target_account"`
Timestamp time.Time `json:"timestamp"`
}
// Service defines the interface for our payment processing business logic.
// In a real-world project, this would interact with a database.
type Service interface {
ProcessPayment(ctx context.Context, p Payment) error
}
// paymentsService is a concrete implementation of the Service interface.
type paymentsService struct {
// In a real application, this would hold a database connection pool.
// For this example, we just log the action.
}
// NewService creates a new payments service.
func NewService() Service {
return &paymentsService{}
}
// ProcessPayment implements the core business logic.
// This operation MUST be idempotent from a business logic perspective,
// or protected by an idempotency layer like we are building.
func (s *paymentsService) ProcessPayment(ctx context.Context, p Payment) error {
// Basic validation.
if p.ID == "" || p.Amount <= 0 || p.SourceAccount == "" || p.TargetAccount == "" {
return ErrInvalidPayment
}
// Simulate processing work, like database writes.
// In a real system, you might wrap multiple database calls in a transaction here.
// For example:
// tx, err := db.BeginTx(ctx, nil)
// defer tx.Rollback()
// 1. Debit source account
// 2. Credit target account
// 3. Record transaction log
// tx.Commit()
time.Sleep(50 * time.Millisecond) // Simulate DB latency.
// A real-world project would log with structured context.
// log.Info("Processed payment", "payment_id", p.ID, "amount", p.Amount)
return nil
}
With the core business logic defined, we construct the Go Kit components. The endpoint
wraps the service method, providing a generic signature that middlewares can operate on.
pkg/payments/endpoints.go
package payments
import (
"context"
"github.com/go-kit/kit/endpoint"
)
// Endpoints holds all Go Kit endpoints for the payments service.
type Endpoints struct {
ProcessPayment endpoint.Endpoint
}
// MakeEndpoints creates the endpoints for the service.
func MakeEndpoints(s Service) Endpoints {
return Endpoints{
ProcessPayment: makeProcessPaymentEndpoint(s),
}
}
func makeProcessPaymentEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req, ok := request.(Payment)
if !ok {
return nil, errors.New("type assertion failed: request is not a Payment")
}
err = s.ProcessPayment(ctx, req)
return nil, err // For async messaging, response is often nil.
}
}
The key piece of our solution is the idempotency middleware. This middleware intercepts requests before they reach the business logic. It uses Redis’s atomic SETNX
(SET if Not eXists) command, which is perfect for this use case. SETNX
returns 1
if the key was set (i.e., it was the first time seeing it) and 0
if the key already existed. This atomicity is critical to avoid race conditions between concurrent consumers processing the same message.
pkg/payments/middleware.go
package payments
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/go-redis/redis/v8"
)
// IdempotencyMiddleware creates an endpoint.Middleware that ensures a request is processed only once.
// It uses Redis to store a record of processed message identifiers.
func IdempotencyMiddleware(rdb *redis.Client, keyExpiry time.Duration) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
// In a Pub/Sub transport, we would extract the message ID.
// For a generic middleware, we can generate a key from the request payload itself.
// This makes the idempotency depend on the content, which is a robust strategy.
// WARNING: This assumes the request payload is deterministic (e.g., JSON keys are ordered).
// A better approach is to rely on a unique ID from the message attribute if available.
// For this example, let's assume the request is a Payment struct and use its ID.
payment, ok := request.(Payment)
if !ok {
// A common mistake is to fail open. We should fail closed to preserve consistency.
return nil, fmt.Errorf("idempotency middleware requires request of type Payment")
}
if payment.ID == "" {
// If no ID is provided, we cannot guarantee idempotency. Reject the request.
return nil, fmt.Errorf("idempotency key (Payment.ID) is missing")
}
idempotencyKey := fmt.Sprintf("idempotency:payment:%s", payment.ID)
// SETNX is an atomic operation.
// It sets the key if it does not exist and returns true.
// If the key already exists, it does nothing and returns false.
wasSet, err := rdb.SetNX(ctx, idempotencyKey, "processed", keyExpiry).Result()
if err != nil {
// If Redis is down, we must fail the request.
// This is the explicit CAP trade-off: sacrificing availability for consistency.
// NACKing the message in the transport will cause it to be redelivered later.
return nil, fmt.Errorf("failed to check idempotency key in Redis: %w", err)
}
if !wasSet {
// This key has been seen before. The message is a duplicate.
// We return success without processing to ensure the message is ACK'd and removed from the queue.
// log.Warn("Duplicate message detected", "key", idempotencyKey)
return nil, nil
}
// This is the first time we've seen this key. Process the request.
response, err = next(ctx, request)
if err != nil {
// The business logic failed. To allow for retries, we must remove the idempotency key.
// If we don't, subsequent retries of the same message will be flagged as duplicates and ignored.
// The pitfall here is that if the Redis command fails, the key might be "stuck",
// preventing legitimate retries. A more robust solution might involve a "pending" state.
if delErr := rdb.Del(ctx, idempotencyKey).Err(); delErr != nil {
// This is a critical failure state. We failed to process and failed to clean up.
// The system will require manual intervention for this message ID.
// log.Error("CRITICAL: Failed to process and failed to delete idempotency key", "key", idempotencyKey, "original_error", err, "cleanup_error", delErr)
}
return response, err
}
// If the business logic succeeds, the idempotency key remains in Redis until it expires.
return response, nil
}
}
}
// A more naive key generation, left here for discussion.
// Prone to issues if JSON key order changes.
func generateKeyFromPayload(request interface{}) (string, error) {
data, err := json.Marshal(request)
if err != nil {
return "", err
}
hash := sha256.Sum256(data)
return fmt.Sprintf("idempotency:%x", hash), nil
}
Now, we define the transport layer that connects the Go Kit service to Google Cloud Pub/Sub. This transport is responsible for receiving messages, decoding them, passing them through the middleware chain to the endpoint, and finally ACKing or NACKing the message based on the outcome.
pkg/payments/transport_pubsub.go
package payments
import (
"context"
"encoding/json"
"cloud.google.com/go/pubsub"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/log"
)
// PubSubHandler is a struct that holds the endpoint and decoding logic.
type PubSubHandler struct {
e endpoint.Endpoint
decode func(context.Context, *pubsub.Message) (interface{}, error)
logger log.Logger
}
// NewPubSubHandler creates a handler for incoming Pub/Sub messages.
func NewPubSubHandler(e endpoint.Endpoint, logger log.Logger) *PubSubHandler {
return &PubSubHandler{
e: e,
decode: decodePubSubPaymentMessage,
logger: logger,
}
}
// Serve subscribes to a Pub/Sub topic and handles messages.
// This function blocks until the context is canceled.
func (h *PubSubHandler) Serve(ctx context.Context, sub *pubsub.Subscription) {
h.logger.Log("msg", "starting pubsub subscriber", "subscription", sub.ID())
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// The entire processing logic for a single message.
request, err := h.decode(ctx, msg)
if err != nil {
h.logger.Log("level", "error", "msg", "failed to decode message, nacking", "err", err, "message_id", msg.ID)
msg.Nack()
return
}
// The endpoint call will trigger the middleware chain (idempotency check)
// and then the business logic.
_, err = h.e(ctx, request)
if err != nil {
// Any error from the endpoint (business logic failure, idempotency store failure)
// should result in a NACK. Pub/Sub will redeliver the message after the ack deadline.
h.logger.Log("level", "error", "msg", "endpoint processing failed, nacking", "err", err, "message_id", msg.ID)
msg.Nack()
return
}
// If we reach here, it means the message was either processed successfully for the first time
// or was identified as a duplicate and skipped. In both cases, we ACK.
h.logger.Log("level", "debug", "msg", "message processed successfully, acking", "message_id", msg.ID)
msg.Ack()
})
if err != nil {
h.logger.Log("level", "error", "msg", "pubsub receive exited with error", "err", err)
}
}
func decodePubSubPaymentMessage(ctx context.Context, msg *pubsub.Message) (interface{}, error) {
var p Payment
if err := json.Unmarshal(msg.Data, &p); err != nil {
return nil, fmt.Errorf("failed to unmarshal payment data: %w", err)
}
// It's a best practice to extract a unique identifier from the message attributes
// rather than relying on the payload content. Pub/Sub message ID is not suitable
// for this as it changes on redelivery. The publisher should add a business-level ID.
if id, ok := msg.Attributes["eventId"]; ok {
p.ID = id // Overwrite or use the ID from attributes for idempotency.
}
if p.ID == "" {
// Fallback if no eventId attribute is present. The middleware will reject this.
return nil, errors.New("message is missing 'eventId' attribute or 'id' field in payload for idempotency")
}
return p, nil
}
Finally, we tie everything together in cmd/main.go
. This includes setting up configuration, initializing clients for Pub/Sub and Redis, creating the service components, and starting the subscriber. In a production environment, configuration should be managed via environment variables or a configuration service, not hardcoded.
cmd/main.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"cloud.google.com/go/pubsub"
"github.com/go-kit/log"
"github.com/go-redis/redis/v8"
// Import the local payments package
"payments-consumer/pkg/payments"
)
// In a real application, these would come from config files or env vars.
const (
gcpProjectID = "your-gcp-project-id"
pubsubSubID = "payments-subscription"
redisAddr = "localhost:6379"
idempotencyExpiry = 24 * time.Hour
)
func main() {
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
// Set up context that is canceled on interrupt signals.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errs := make(chan error, 1)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errs <- fmt.Errorf("%s", <-c)
}()
// 1. Set up Redis Client
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
})
if err := rdb.Ping(ctx).Err(); err != nil {
logger.Log("level", "error", "msg", "failed to connect to Redis", "err", err)
os.Exit(1)
}
logger.Log("msg", "connected to redis")
// 2. Set up Pub/Sub Client
pubsubClient, err := pubsub.NewClient(ctx, gcpProjectID)
if err != nil {
logger.Log("level", "error", "msg", "failed to create pubsub client", "err", err)
os.Exit(1)
}
defer pubsubClient.Close()
sub := pubsubClient.Subscription(pubsubSubID)
logger.Log("msg", "connected to pubsub")
// 3. Create service, endpoints, and transport
var svc payments.Service
{
svc = payments.NewService()
// Add logging middleware here in a real app
}
endpoints := payments.MakeEndpoints(svc)
{
// Chain the idempotency middleware. This is the crucial step.
endpoints.ProcessPayment = payments.IdempotencyMiddleware(rdb, idempotencyExpiry)(endpoints.ProcessPayment)
// Other middlewares like logging, tracing, metrics would be chained here.
}
handler := payments.NewPubSubHandler(endpoints.ProcessPayment, logger)
// 4. Start the server
go func() {
handler.Serve(ctx, sub)
}()
logger.Log("msg", "service started")
logger.Log("terminated", <-errs)
}
A common misconception is that the CAP theorem provides a simple choice. In reality, modern systems are composed of many components, each with its own CAP characteristics. Our application is a perfect example. While Pub/Sub itself is an AP system, by layering our idempotency check on top of Redis (which, in a single-instance setup, is a CP system), we’ve created a composite service whose overall characteristic is now CP for the message processing path. The availability of the entire system is now the intersection of the availabilities of its components. If Redis fails, our service correctly stops processing to prevent inconsistent state, demonstrating a conscious architectural decision to favor consistency.
The choice of idempotency key and its lifecycle is a critical implementation detail. Using a business-level identifier provided by the publisher (e.g., eventId
) is far more robust than hashing the message payload, which can be brittle if data structures evolve. Furthermore, setting an expiry on the keys in Redis is a pragmatic choice to manage memory growth. The expiry duration must be longer than the maximum possible message redelivery time from Pub/Sub, plus a safety margin. A common pitfall is setting this too short, which could allow a very delayed message to be processed twice.
The current implementation’s reliance on Redis introduces a single point of failure for consistency. If Redis fails, the entire message processing pipeline halts. This might be an acceptable trade-off for systems handling critical transactions. However, for less critical workloads, one could introduce a circuit breaker around the Redis check, allowing the service to fall back to at-least-once processing (higher availability) during a Redis outage, while logging alerts for manual reconciliation later. The applicability of this pattern is bounded by the business’s tolerance for data inconsistency versus its requirement for uptime. For systems requiring the highest level of consistency, the idempotency store itself could be a distributed, transaction-capable database like Google Cloud Spanner, which presents its own set of cost and latency trade-offs.