Implementing a Decoupled WebSocket Relay Architecture in Go Using AWS SQS for Mobile Client Resilience


The first iteration of our notification service was a classic in-memory Go hub-and-spoke model. An HTTP endpoint received messages, looked up the recipient’s WebSocket connection in a global map, and pushed the data directly. This worked for a few hundred concurrent connections in staging. In production, under the strain of thousands of mobile clients with unreliable network connectivity, the architecture crumbled. Two critical failure modes emerged: broadcast storms and message loss. A single event triggering notifications to a large user group would lock up the hub, causing massive latency spikes. More critically, if a mobile client disconnected for even a moment—switching from Wi-Fi to cellular, for instance—any message sent during that brief window was lost forever. The tight coupling between message ingestion and delivery was the root of the instability.

To solve this, we had to fundamentally decouple the two processes. The new architecture would treat message ingestion and message delivery as separate, asynchronous domains, buffered by a durable, highly-available message queue. AWS SQS was chosen for this role. Its simplicity, managed nature, and cost-effectiveness made it a pragmatic choice over more complex systems like Kafka for our specific use case, which primarily involved targeted, individual user notifications rather than streaming analytics. The ingestion service’s only job would be to validate an incoming message and drop it into an SQS queue. A separate fleet of “Relay” servers would be responsible for consuming from this queue and attempting to deliver the messages to the appropriate clients over persistent WebSocket connections.

Here is the high-level flow we designed to replace the fragile, direct-dispatch model.

sequenceDiagram
    participant Producer as API Endpoint (Producer)
    participant SQS as AWS SQS Queue
    participant Relay as Go Relay Server
    participant Client as Mobile Client

    Producer->>+SQS: Enqueue Message {userId: "u-123", payload: "..."}
    SQS-->>-Producer: Ack (Message Persisted)

    Relay->>+SQS: Long-poll for messages
    SQS-->>-Relay: Deliver Message {userId: "u-123", ...}

    Relay->>Relay: Lookup connection for "u-123"
    alt User is Connected
        Relay->>+Client: Send payload via WebSocket
        Client-->>-Relay: (TCP ACK)
        Relay->>+SQS: Delete Message from Queue
        SQS-->>-Relay: Ack (Deletion Confirmed)
    else User is Disconnected
        Relay->>SQS: (No action) Message remains in queue
        Note right of Relay: Message becomes visible again
after Visibility Timeout. end

This design introduces a buffer that protects the WebSocket layer from ingestion spikes and ensures messages aren’t lost if a recipient is temporarily offline. The implementation, however, is where the real challenges lie, particularly in managing connection state, handling graceful shutdowns, and ensuring messages are processed correctly without loss or unnecessary duplication.

Building the Core Components in Go

The project is structured to separate concerns: configuration, WebSocket connection management (the “hub”), and SQS interaction (the “consumer”).

/relay-service
├── cmd
│   └── server
│       └── main.go
├── internal
│   ├── client
│   │   └── client.go
│   ├── hub
│   │   └── hub.go
│   ├── consumer
│   │   └── consumer.go
│   └── handler
│       └── http.go
└── pkg
    └── config
        └── config.go

The configuration is managed via a struct loaded from environment variables, a common practice for cloud-native applications.

// pkg/config/config.go
package config

import (
	"log"
	"time"

	"github.com/kelseyhightower/envconfig"
)

// Config holds the application configuration.
type Config struct {
	AWSRegion         string        `envconfig:"AWS_REGION" required:"true"`
	SQSQueueURL       string        `envconfig:"SQS_QUEUE_URL" required:"true"`
	SQSWaitTimeSecs   int64         `envconfig:"SQS_WAIT_TIME_SECS" default:"20"`
	SQSMaxMessages    int32         `envconfig:"SQS_MAX_MESSAGES" default:"10"`
	ServerPort        string        `envconfig:"SERVER_PORT" default:"8080"`
	ReadBufferSize    int           `envconfig:"READ_BUFFER_SIZE" default:"1024"`
	WriteBufferSize   int           `envconfig:"WRITE_BUFFER_SIZE" default:"1024"`
	GracefulShutdown  time.Duration `envconfig:"GRACEFUL_SHUTDOWN_TIMEOUT" default:"15s"`
}

// Load loads configuration from environment variables.
func Load() (*Config, error) {
	var cfg Config
	err := envconfig.Process("", &cfg)
	if err != nil {
		return nil, err
	}
	log.Printf("[INFO] Configuration loaded: %+v", cfg)
	return &cfg, nil
}

The heart of the WebSocket management is the Hub. In a production system, this component is more than just a map of connections. It needs to be thread-safe and handle the lifecycle of client registrations and deregistrations cleanly.

// internal/hub/hub.go
package hub

import (
	"log/slog"
	"sync"

	"relay-service/internal/client"
)

// Hub maintains the set of active clients and broadcasts messages.
// In this architecture, it primarily acts as a connection registry.
type Hub struct {
	// A map of userID to the active client connection.
	// This assumes one connection per user. For multi-device support,
	// this would need to be `map[string][]*client.Client`.
	clients map[string]*client.Client

	// Mutex to protect concurrent access to the clients map.
	mu sync.RWMutex

	// Channel for incoming client registrations.
	register chan *client.Client

	// Channel for client deregistrations.
	unregister chan *client.Client
}

// NewHub creates and returns a new Hub instance.
func NewHub() *Hub {
	return &Hub{
		clients:    make(map[string]*client.Client),
		register:   make(chan *client.Client),
		unregister: make(chan *client.Client),
	}
}

// Run starts the hub's event loop. It must be run in a separate goroutine.
func (h *Hub) Run() {
	for {
		select {
		case c := <-h.register:
			h.mu.Lock()
			// If a client with the same user ID already exists,
			// we disconnect the old one. This enforces a single-session policy.
			if oldClient, ok := h.clients[c.UserID]; ok {
				slog.Warn("Existing connection found for user, closing old one", "userID", c.UserID)
				close(oldClient.Send)
				delete(h.clients, c.UserID)
			}
			h.clients[c.UserID] = c
			h.mu.Unlock()
			slog.Info("Client registered", "userID", c.UserID)

		case c := <-h.unregister:
			h.mu.Lock()
			if existingClient, ok := h.clients[c.UserID]; ok {
				// Ensure we are unregistering the exact same client instance.
				if existingClient == c {
					delete(h.clients, c.UserID)
					close(c.Send)
					slog.Info("Client unregistered", "userID", c.UserID)
				}
			}
			h.mu.Unlock()
		}
	}
}

// Register adds a client to the hub's registration channel.
func (h *Hub) Register(c *client.Client) {
	h.register <- c
}

// Unregister adds a client to the hub's unregistration channel.
func (h *Hub) Unregister(c *client.Client) {
	h.unregister <- c
}

// FindClient safely retrieves a client by their user ID.
func (h *Hub) FindClient(userID string) (*client.Client, bool) {
	h.mu.RLock()
	defer h.mu.RUnlock()
	client, ok := h.clients[userID]
	return client, ok
}

The Client represents a single WebSocket connection. It has a dedicated send channel that acts as a buffer, preventing slow clients from blocking the delivery logic.

// internal/client/client.go
package client

import (
	"log/slog"
	"time"

	"github.com/gorilla/websocket"
)

const (
	writeWait      = 10 * time.Second
	pongWait       = 60 * time.Second
	pingPeriod     = (pongWait * 9) / 10
	maxMessageSize = 512
)

// Client is a middleman between the WebSocket connection and the hub.
type Client struct {
	UserID string
	Hub    HubInterface
	Conn   *websocket.Conn
	Send   chan []byte
}

// HubInterface defines the methods the client needs from the hub.
// Using an interface here allows for easier testing by mocking the hub.
type HubInterface interface {
	Unregister(c *Client)
}

// NewClient creates a new client instance.
func NewClient(userID string, hub HubInterface, conn *websocket.Conn) *Client {
	return &Client{
		UserID: userID,
		Hub:    hub,
		Conn:   conn,
		Send:   make(chan []byte, 256),
	}
}

// readPump pumps messages from the WebSocket connection to the hub.
// For this relay, we mainly care about control messages. Application messages
// are not expected from the client.
func (c *Client) ReadPump() {
	defer func() {
		c.Hub.Unregister(c)
		c.Conn.Close()
	}()
	c.Conn.SetReadLimit(maxMessageSize)
	c.Conn.SetReadDeadline(time.Now().Add(pongWait))
	c.Conn.SetPongHandler(func(string) error {
		c.Conn.SetReadDeadline(time.Now().Add(pongWait));
		return nil
	})
	for {
		_, _, err := c.Conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				slog.Error("Unexpected WebSocket close error", "userID", c.UserID, "error", err)
			}
			break
		}
	}
}

// writePump pumps messages from the hub to the WebSocket connection.
func (c *Client) WritePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.Conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.Send:
			c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// The hub closed the channel.
				c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			w, err := c.Conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)

			if err := w.Close(); err != nil {
				return
			}
		case <-ticker.C:
			c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

The SQS Consumer: Bridging the Queue to Live Connections

The most critical component is the SQS consumer. It’s responsible for fetching messages and dispatching them to the correct, currently connected client. A common mistake here is to fetch a message and then immediately delete it. In a distributed system, this is a recipe for message loss. The correct pattern is to process the message first (i.e., deliver it to the WebSocket), and only upon successful delivery, delete it from the queue.

// internal/consumer/consumer.go
package consumer

import (
	"context"
	"encoding/json"
	"log/slog"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"

	"relay-service/internal/client"
	"relay-service/pkg/config"
)

// HubFinder defines the interface for finding clients.
type HubFinder interface {
	FindClient(userID string) (*client.Client, bool)
}

// SQSAPI defines the interface for SQS operations we need.
// This is crucial for mocking SQS in unit tests.
type SQSAPI interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
}

// Consumer polls SQS and dispatches messages to the hub.
type Consumer struct {
	Svc    SQSAPI
	Hub    HubFinder
	Cfg    *config.Config
}

// NotificationMessage defines the expected structure of a message from SQS.
type NotificationMessage struct {
	UserID  string          `json:"userId"`
	Payload json.RawMessage `json:"payload"`
}

// NewConsumer creates a new SQS consumer.
func NewConsumer(svc SQSAPI, hub HubFinder, cfg *config.Config) *Consumer {
	return &Consumer{Svc: svc, Hub: hub, Cfg: cfg}
}

// Start begins the long-polling loop to consume messages from SQS.
// It takes a context to allow for graceful shutdown.
func (c *Consumer) Start(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	slog.Info("Starting SQS consumer...")

	for {
		select {
		case <-ctx.Done():
			slog.Info("SQS consumer shutting down...")
			return
		default:
			c.poll(ctx)
		}
	}
}

func (c *Consumer) poll(ctx context.Context) {
	receiveInput := &sqs.ReceiveMessageInput{
		QueueUrl:              &c.Cfg.SQSQueueURL,
		MaxNumberOfMessages:   c.Cfg.SQSMaxMessages,
		WaitTimeSeconds:       c.Cfg.SQSWaitTimeSecs,
		// We need message attributes if they contain metadata, e.g., message type.
		// MessageAttributeNames: []string{"All"},
	}

	resp, err := c.Svc.ReceiveMessage(ctx, receiveInput)
	if err != nil {
		slog.Error("Failed to receive messages from SQS", "error", err)
		// In a production system, you'd implement a backoff strategy here.
		time.Sleep(5 * time.Second)
		return
	}

	if len(resp.Messages) > 0 {
		slog.Info("Received messages from SQS", "count", len(resp.Messages))
		var processWg sync.WaitGroup
		for _, msg := range resp.Messages {
			processWg.Add(1)
			go c.processMessage(ctx, msg, &processWg)
		}
		processWg.Wait()
	}
}

func (c *Consumer) processMessage(ctx context.Context, msg types.Message, wg *sync.WaitGroup) {
	defer wg.Done()

	var notif NotificationMessage
	if err := json.Unmarshal([]byte(*msg.Body), &notif); err != nil {
		slog.Error("Failed to unmarshal SQS message body", "body", *msg.Body, "error", err)
		// A malformed message should not be retried indefinitely.
		// We delete it to prevent clogging the queue. In a real project,
		// this would be moved to a Dead-Letter Queue (DLQ) for inspection.
		c.deleteMessage(ctx, msg.ReceiptHandle)
		return
	}

	if notif.UserID == "" {
		slog.Error("SQS message missing userId", "body", *msg.Body)
		c.deleteMessage(ctx, msg.ReceiptHandle)
		return
	}

	targetClient, found := c.Hub.FindClient(notif.UserID)
	if !found {
		// This is the key resilience feature. If the user is not connected,
		// we DO NOT delete the message. SQS will make it visible again after
		// the VisibilityTimeout expires.
		slog.Info("Client not found, message will be retried", "userID", notif.UserID)
		return
	}

	// The client is connected, attempt to send the message.
	// We use a select with a timeout to avoid blocking indefinitely if the
	// client's send channel is full.
	select {
	case targetClient.Send <- notif.Payload:
		slog.Info("Successfully sent message to client", "userID", notif.UserID)
		// Only delete the message AFTER successful delivery to the client's channel.
		c.deleteMessage(ctx, msg.ReceiptHandle)
	case <-time.After(1 * time.Second):
		slog.Warn("Timed out sending message to client channel", "userID", notif.UserID)
		// Do not delete the message; let it be retried.
	case <-ctx.Done():
		slog.Info("Context cancelled during message send", "userID", notif.UserID)
		// Do not delete the message.
	}
}

func (c *Consumer) deleteMessage(ctx context.Context, receiptHandle *string) {
	deleteInput := &sqs.DeleteMessageInput{
		QueueUrl:      &c.Cfg.SQSQueueURL,
		ReceiptHandle: receiptHandle,
	}
	_, err := c.Svc.DeleteMessage(ctx, deleteInput)
	if err != nil {
		slog.Error("Failed to delete message from SQS", "receiptHandle", *receiptHandle, "error", err)
	} else {
		slog.Debug("Successfully deleted message from SQS", "receiptHandle", *receiptHandle)
	}
}

This consumer implementation directly addresses the message loss problem. If a client is disconnected, FindClient returns false, and the processMessage function simply returns. The message is not deleted from SQS. It remains invisible for the duration of the VisibilityTimeout and then becomes available for another consumer to pick up. This gives the mobile client a window of time to reconnect. If it remains offline for too long, the message will exceed the queue’s maxReceiveCount and be automatically moved to a configured Dead-Letter Queue (DLQ) by AWS, preventing it from being retried forever. This DLQ can then be used for offline processing, such as converting the missed real-time notifications into batch push notifications.

Tying It All Together: The Main Server

The main function orchestrates all these components, setting up the HTTP server for WebSocket upgrades, starting the hub, and launching the SQS consumers. Graceful shutdown is crucial; we need to give active consumers time to finish processing their current batch of messages before the application exits.

// cmd/server/main.go
package main

import (
	"context"
	"log/slog"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	awsconfig "github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"

	"relay-service/internal/consumer"
	"relay-service/internal/handler"
	"relay-service/internal/hub"
	"relay-service/pkg/config"
)

func main() {
	// Setup structured logging
	logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
	slog.SetDefault(logger)

	cfg, err := config.Load()
	if err != nil {
		slog.Error("Failed to load configuration", "error", err)
		os.Exit(1)
	}

	awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(), awsconfig.WithRegion(cfg.AWSRegion))
	if err != nil {
		slog.Error("Failed to load AWS configuration", "error", err)
		os.Exit(1)
	}

	sqsClient := sqs.NewFromConfig(awsCfg)
	hub := hub.NewHub()
	go hub.Run()

	// Start SQS consumers
	numConsumers := 4 // This should be configurable and tuned based on load
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())

	for i := 0; i < numConsumers; i++ {
		c := consumer.NewConsumer(sqsClient, hub, cfg)
		wg.Add(1)
		go c.Start(ctx, &wg)
	}

	// Setup HTTP server for WebSocket connections
	httpHandler := handler.NewHTTPHandler(hub, cfg)
	server := &http.Server{
		Addr:    ":" + cfg.ServerPort,
		Handler: httpHandler.Mux(),
	}

	go func() {
		slog.Info("Starting HTTP server", "port", cfg.ServerPort)
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			slog.Error("HTTP server failed", "error", err)
			os.Exit(1)
		}
	}()

	// Wait for termination signal
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	slog.Info("Shutting down server...")

	// Cancel the context for SQS consumers
	cancel()
	// Wait for all consumers to finish gracefully
	wg.Wait()
	slog.Info("All SQS consumers have stopped.")

	// Shutdown HTTP server
	shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), cfg.GracefulShutdown)
	defer shutdownCancel()
	if err := server.Shutdown(shutdownCtx); err != nil {
		slog.Error("Server forced to shutdown", "error", err)
	}

	slog.Info("Server exiting.")
}

The corresponding HTTP handler for upgrading connections:

// internal/handler/http.go
package handler

import (
	"log/slog"
	"net/http"

	"github.com/gorilla/websocket"

	"relay-service/internal/client"
	"relay-service/internal/hub"
	"relay-service/pkg/config"
)

// HTTPHandler handles WebSocket upgrade requests.
type HTTPHandler struct {
	hub      *hub.Hub
	upgrader websocket.Upgrader
}

// NewHTTPHandler creates a new handler.
func NewHTTPHandler(h *hub.Hub, cfg *config.Config) *HTTPHandler {
	return &HTTPHandler{
		hub: h,
		upgrader: websocket.Upgrader{
			ReadBufferSize:  cfg.ReadBufferSize,
			WriteBufferSize: cfg.WriteBufferSize,
			CheckOrigin: func(r *http.Request) bool {
				// In a real application, implement proper origin validation.
				return true
			},
		},
	}
}

// Mux returns the configured ServeMux.
func (h *HTTPHandler) Mux() *http.ServeMux {
	mux := http.NewServeMux()
	mux.HandleFunc("/ws", h.serveWs)
	return mux
}

func (h *HTTPHandler) serveWs(w http.ResponseWriter, r *http.Request) {
	// Authentication should be handled here. For this example, we'll
	// extract the user ID from a query parameter. In production, this would
	// come from a JWT or session cookie.
	userID := r.URL.Query().Get("userId")
	if userID == "" {
		http.Error(w, "User ID is required", http.StatusBadRequest)
		return
	}

	conn, err := h.upgrader.Upgrade(w, r, nil)
	if err != nil {
		slog.Error("Failed to upgrade connection", "error", err)
		return
	}

	c := client.NewClient(userID, h.hub, conn)
	h.hub.Register(c)

	go c.WritePump()
	go c.ReadPump()
}

This decoupled architecture successfully resolved our initial problems. The ingestion API can now handle massive bursts of traffic by simply writing to SQS, with latency remaining low and predictable. The relay servers are protected from these bursts, consuming messages at a steady pace. Most importantly, temporary client disconnections no longer result in data loss; the messages simply wait in the queue for the client to return.

The current design, however, is not without its own set of trade-offs and limitations. The connection state—the mapping of a userID to its active WebSocket connection—is stored in the memory of each individual relay server instance. This makes the relay stateful. If a relay server crashes, all clients connected to it are disconnected and must reconnect to another instance. Furthermore, for horizontal scaling, a load balancer can distribute incoming connections across multiple relay instances, but there’s no guarantee that a message for user-123 pulled by relay-A can be delivered if user-123 is actually connected to relay-B. A future iteration would require externalizing this connection map to a shared, low-latency store like Redis Pub/Sub or a dedicated routing service, allowing any relay instance to service a message for any user, thus making the relay layer truly stateless and horizontally scalable.


  TOC