Architecting a High-Throughput System with CQRS Using Go, AWS SQS, and a Reactive Solid.js Frontend


The defined technical challenge is to design a system capable of ingesting a high volume of status updates—potentially thousands per second—while simultaneously providing a real-time monitoring dashboard to users with minimal latency. A conventional monolithic architecture, where a single API handles both writes to and reads from a primary database, inevitably creates a bottleneck. Write operations lock tables or documents, degrading read performance. Scaling the entire monolith to handle write spikes is inefficient and costly, as read traffic may not have changed. This tight coupling between the command (write) and query (read) paths is a fundamental flaw for this class of problem.

A traditional approach involves a RESTful API built with a framework like Go-Gin, writing directly to a transactional database like PostgreSQL. The frontend, perhaps built with Solid.js, would then poll this API every few seconds to refresh its data.

Pros of the Traditional Monolithic Approach:

  • Simplicity: The logic is co-located and follows a familiar request-response pattern.
  • Strong Consistency: The data read is always the most recently committed write.
  • Rapid Initial Development: The path from concept to a working prototype is short.

Cons of the Traditional Monolithic Approach:

  • Write Contention: High-frequency updates can lead to database deadlocks, slow queries, and transaction timeouts. The UPDATE operations become the system’s primary bottleneck.
  • Inefficient UI Updates: Polling is resource-intensive for both the client and server. It creates unnecessary network traffic and results in a UI that is always slightly out of date, never truly “real-time.”
  • Coupled Scaling: The read and write workloads cannot be scaled independently. A surge in write traffic requires scaling the entire application stack, even if read traffic remains constant. In a real-world project, this leads to significant over-provisioning and increased operational costs.

An alternative architecture, Command Query Responsibility Segregation (CQRS), directly addresses these shortcomings. By formally separating the write model (Commands) from the read model (Queries), we can optimize each path independently. The write path can be designed for maximum throughput and durability, while the read path can be optimized for extremely fast, low-latency queries.

The Proposed CQRS Architecture:

  1. Command API (Go-Gin): An ultra-lightweight API endpoint whose sole responsibility is to accept, validate, and immediately enqueue a command message into a durable message queue. It does not perform any complex business logic or database writes itself.
  2. Message Queue (AWS SQS): Acts as a highly available and scalable buffer between the command and processing logic. It absorbs write spikes, ensures commands are not lost, and decouples the API from the backend workers.
  3. Command Processor (Go Worker): A separate, independently scalable service that consumes messages from SQS. It executes the core business logic and updates a denormalized read model.
  4. Read Model: A data store optimized for fast reads. This could be Redis, Elasticsearch, or even an in-memory cache, depending on the query patterns. The data is structured specifically for the UI’s needs.
  5. Query & Real-time Layer (Go-Gin with WebSocket): A service that reads from the optimized read model and pushes updates to connected clients over a persistent WebSocket connection. This eliminates polling entirely.
  6. Frontend (Solid.js): A reactive client that listens for WebSocket messages and updates its state. Solid.js’s fine-grained reactivity ensures that only the precise parts of the DOM that depend on the changed data are re-rendered.

This architecture was chosen because the acceptance of eventual consistency for the dashboard is a small price to pay for the massive gains in write throughput, system resilience, and scalability. In a monitoring scenario, a delay of a few hundred milliseconds between an event occurring and it appearing on the dashboard is almost always an acceptable trade-off.

Here is a structural overview of the system.

graph TD
    subgraph Frontend
        A[Solid.js Client]
    end

    subgraph Backend Services
        B(Go-Gin Command API)
        C(Go-Gin Query API / WebSocket Server)
        D(Go SQS Worker)
    end

    subgraph AWS Infrastructure
        E[AWS SQS Queue]
    end

    subgraph Data Stores
        F[Optimized Read Model - e.g., Redis/In-Memory]
    end

    A -- HTTP POST /commands --> B
    B -- Enqueues Command --> E
    D -- Polls Messages --> E
    D -- Updates Read Model --> F
    D -- Notifies via Channel --> C
    A -- WebSocket Connection --> C
    C -- Pushes Real-time Updates --> A
    C -- Reads Data --> F

The core implementation begins with the command path. The Go-Gin API is minimal by design. Its purpose is to perform basic validation and hand off the workload to SQS as quickly as possible.

File: command-api/main.go

package main

import (
	"command-api/handler"
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/gin-contrib/cors"
	"github.com/gin-gonic/gin"
)

func main() {
	// In a production environment, use structured logging (e.g., zerolog, zap).
	log.Println("Starting command API service...")

	// Load AWS configuration from environment variables or IAM role.
	// This is the standard, secure way to configure AWS SDKs.
	cfg, err := config.LoadDefaultConfig(context.TODO())
	if err != nil {
		log.Fatalf("unable to load AWS SDK config, %v", err)
	}

	sqsClient := sqs.NewFromConfig(cfg)
	queueURL := os.Getenv("AWS_SQS_QUEUE_URL")
	if queueURL == "" {
		log.Fatal("AWS_SQS_QUEUE_URL environment variable must be set")
	}

	// Create the handler, injecting its dependencies (SQS client and queue URL).
	// This promotes testability.
	commandHandler := handler.NewCommandHandler(sqsClient, queueURL)

	router := gin.Default()
	
	// A common mistake is overly permissive CORS policies. In production,
	// restrict this to the specific frontend domain.
	router.Use(cors.New(cors.Config{
		AllowOrigins:     []string{"http://localhost:3000"}, // For local Solid.js dev server
		AllowMethods:     []string{"POST", "OPTIONS"},
		AllowHeaders:     []string{"Origin", "Content-Type"},
		ExposeHeaders:    []string{"Content-Length"},
		AllowCredentials: true,
		MaxAge:           12 * time.Hour,
	}))

	// API versioning is crucial for maintainability.
	v1 := router.Group("/api/v1")
	{
		v1.POST("/device-status", commandHandler.SubmitDeviceStatus)
	}
	
	srv := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	// Graceful shutdown logic. This is critical for production services
	// to finish processing in-flight requests before exiting.
	go func() {
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("listen: %s\n", err)
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("Shutting down server...")

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatal("Server forced to shutdown:", err)
	}

	log.Println("Server exiting")
}

The handler itself contains the logic for validation and message publishing. Notice the separation of concerns.

File: command-api/handler/command_handler.go

package handler

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
	"github.comcom/gin-gonic/gin"
	"github.com/google/uuid"
)

// SQSClient defines the interface we expect from an SQS client.
// Using an interface allows for easy mocking in unit tests.
type SQSClient interface {
	SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
}

// CommandHandler encapsulates the dependencies for handling commands.
type CommandHandler struct {
	SqsClient SQSClient
	QueueURL  string
}

func NewCommandHandler(client SQSClient, queueURL string) *CommandHandler {
	return &CommandHandler{
		SqsClient: client,
		QueueURL:  queueURL,
	}
}

// DeviceStatusCommand represents the command payload from the client.
// We use struct tags for validation. In a real-world project, a library
// like 'go-playground/validator' would be used for more complex rules.
type DeviceStatusCommand struct {
	DeviceID    string    `json:"deviceId" binding:"required"`
	Temperature float64   `json:"temperature" binding:"required"`
	Latitude    float64   `json:"latitude" binding:"required,latitude"`
	Longitude   float64   `json:"longitude" binding:"required,longitude"`
	Timestamp   time.Time `json:"timestamp" binding:"required"`
}

// SubmitDeviceStatus is the Gin handler for accepting new device status updates.
func (h *CommandHandler) SubmitDeviceStatus(c *gin.Context) {
	var command DeviceStatusCommand
	if err := c.ShouldBindJSON(&command); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	// The command is valid. Now, serialize it and send it to SQS.
	messageBody, err := json.Marshal(command)
	if err != nil {
		log.Printf("ERROR: Failed to marshal command for device %s: %v", command.DeviceID, err)
		c.JSON(http.StatusInternalServerError, gin.H{"error": "internal server error"})
		return
	}

	// A critical detail for high-throughput systems is idempotency.
	// While SQS standard queues provide at-least-once delivery, which can result
	// in duplicates, a unique message deduplication ID can help if using a FIFO queue.
	// For standard queues, the consumer must be idempotent.
	messageID := uuid.New().String()

	_, err = h.SqsClient.SendMessage(c.Request.Context(), &sqs.SendMessageInput{
		MessageBody: aws.String(string(messageBody)),
		QueueUrl:    aws.String(h.QueueURL),
		MessageAttributes: map[string]types.MessageAttributeValue{
			"CommandType": {
				DataType:    aws.String("String"),
				StringValue: aws.String("DeviceStatusUpdate"),
			},
		},
		// Using a message group ID is only relevant for FIFO queues to ensure ordering within a group.
		// MessageGroupId: aws.String(command.DeviceID),
		// MessageDeduplicationId: aws.String(messageID),
	})

	if err != nil {
		log.Printf("ERROR: Failed to send message to SQS for device %s: %v", command.DeviceID, err)
		c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to process request"})
		return
	}

	// We respond immediately with success. The actual processing happens asynchronously.
	// This is the key to high throughput.
	c.JSON(http.StatusAccepted, gin.H{"messageId": messageID, "status": "pending"})
}

Next is the SQS worker. This is a standalone Go application that runs in a continuous loop, polling SQS for new messages.

File: sqs-worker/main.go

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.comcom/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

// DeviceStatus represents our internal state, the "read model".
// In a real application, this would be stored in Redis, DynamoDB, etc.
// For this example, an in-memory map demonstrates the principle.
type DeviceStatus struct {
	DeviceID    string    `json:"deviceId"`
	Temperature float64   `json:"temperature"`
	Latitude    float64   `json:"latitude"`
	Longitude   float64   `json:"longitude"`
	LastUpdate  time.Time `json:"lastUpdate"`
}

// ReadModelStore simulates a fast, concurrent data store for our query side.
type ReadModelStore struct {
	sync.RWMutex
	devices map[string]DeviceStatus
}

func NewReadModelStore() *ReadModelStore {
	return &ReadModelStore{
		devices: make(map[string]DeviceStatus),
	}
}

func (s *ReadModelStore) UpdateDeviceStatus(status DeviceStatus) {
	s.Lock()
	defer s.Unlock()
	s.devices[status.DeviceID] = status
	log.Printf("Updated read model for device: %s", status.DeviceID)
}

// This would be used by the query API.
func (s *ReadModelStore) GetAllDeviceStatuses() []DeviceStatus {
	s.RLock()
	defer s.RUnlock()
	statuses := make([]DeviceStatus, 0, len(s.devices))
	for _, status := range s.devices {
		statuses = append(statuses, status)
	}
	return statuses
}

// This function simulates notifying the WebSocket layer that data has changed.
// In a real system, this could be a Redis Pub/Sub call, a gRPC call, or a channel.
func notifyQueryLayer(status DeviceStatus) {
	// For now, we just log it. In the full implementation, this will be a channel.
	log.Printf("NOTIFY: Pushing update for device %s to query layer", status.DeviceID)
	// queryService.BroadcastUpdate(status)
}

func main() {
	log.Println("Starting SQS worker service...")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		log.Fatalf("unable to load AWS SDK config, %v", err)
	}

	sqsClient := sqs.NewFromConfig(cfg)
	queueURL := os.Getenv("AWS_SQS_QUEUE_URL")
	if queueURL == "" {
		log.Fatal("AWS_SQS_QUEUE_URL environment variable must be set")
	}

	readModel := NewReadModelStore()
	var wg sync.WaitGroup

	// Handle graceful shutdown.
	termChan := make(chan os.Signal, 1)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

	// Start a pool of workers. The number should be configurable.
	numWorkers := 5
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			log.Printf("Worker %d started", workerID)
			pollAndProcess(ctx, sqsClient, queueURL, readModel, workerID)
			log.Printf("Worker %d stopped", workerID)
		}(i)
	}
	
	<-termChan // Block until termination signal
	log.Println("Termination signal received. Shutting down workers...")
	cancel() // Signal all goroutines to stop by canceling the context
	wg.Wait() // Wait for all workers to finish
	log.Println("All workers have shut down. Exiting.")
}

func pollAndProcess(ctx context.Context, client *sqs.Client, queueURL string, store *ReadModelStore, workerID int) {
	for {
		select {
		case <-ctx.Done(): // Check for cancellation signal
			return
		default:
			// Using Long Polling by setting WaitTimeSeconds is critical for reducing cost
			// and empty receives on the SQS queue.
			resp, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
				QueueUrl:            &queueURL,
				MaxNumberOfMessages: 10, // Process messages in batches for efficiency.
				WaitTimeSeconds:     20, // Long polling.
				AttributeNames:      []types.QueueAttributeName{types.QueueAttributeNameAll},
			})

			if err != nil {
				log.Printf("Worker %d: ERROR receiving messages: %v", workerID, err)
				time.Sleep(5 * time.Second) // Backoff on error
				continue
			}

			if len(resp.Messages) == 0 {
				continue
			}
			
			log.Printf("Worker %d: Received %d messages", workerID, len(resp.Messages))

			for _, msg := range resp.Messages {
				processMessage(ctx, msg, store)

				// After successful processing, delete the message from the queue.
				// A common pitfall is failing to delete the message, causing it to be re-processed.
				// If processing fails, not deleting it allows it to be retried, which is why
				// idempotent processing logic is essential.
				_, delErr := client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
					QueueUrl:      &queueURL,
					ReceiptHandle: msg.ReceiptHandle,
				})
				if delErr != nil {
					log.Printf("Worker %d: ERROR deleting message %s: %v", workerID, *msg.MessageId, delErr)
				}
			}
		}
	}
}

func processMessage(ctx context.Context, msg types.Message, store *ReadModelStore) {
	var command struct {
		DeviceID    string    `json:"deviceId"`
		Temperature float64   `json:"temperature"`
		Latitude    float64   `json:"latitude"`
		Longitude   float64   `json:"longitude"`
		Timestamp   time.Time `json:"timestamp"`
	}

	if err := json.Unmarshal([]byte(*msg.Body), &command); err != nil {
		log.Printf("ERROR: Could not unmarshal message body %s: %v", *msg.MessageId, err)
		// Here, you would typically send the message to a Dead Letter Queue (DLQ)
		// instead of just dropping it.
		return
	}

	// This is where the business logic would go.
	// For this example, we just update our read model.
	status := DeviceStatus{
		DeviceID:    command.DeviceID,
		Temperature: command.Temperature,
		Latitude:    command.Latitude,
		Longitude:   command.Longitude,
		LastUpdate:  command.Timestamp,
	}
	store.UpdateDeviceStatus(status)
	notifyQueryLayer(status)
}

Now, for the query side and real-time push. We’ll integrate a WebSocket layer into another Go-Gin application. This service is responsible for managing client connections and broadcasting updates.

File: query-api/main.go (Simplified for brevity, combining concepts)

package main

import (
    "encoding/json"
    "log"
    "net/http"
    "sync"

    "github.com/gin-contrib/cors"
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
)

// In a real system, this would be a shared dependency with the SQS worker,
// likely interacting with a Redis instance. Here we simulate it.
type DeviceStatus struct {
	DeviceID    string    `json:"deviceId"`
	Temperature float64   `json:"temperature"`
	LastUpdate  string    `json:"lastUpdate"`
}

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        // In production, validate the origin.
        return true
    },
}

// Hub maintains the set of active clients and broadcasts messages to them.
type Hub struct {
    clients    map[*websocket.Conn]bool
    broadcast  chan []byte
    register   chan *websocket.Conn
    unregister chan *websocket.Conn
    mu         sync.Mutex
}

func newHub() *Hub {
    return &Hub{
        broadcast:  make(chan []byte),
        register:   make(chan *websocket.Conn),
        unregister: make(chan *websocket.Conn),
        clients:    make(map[*websocket.Conn]bool),
    }
}

func (h *Hub) run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            h.mu.Unlock()
        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                client.Close()
            }
            h.mu.Unlock()
        case message := <-h.broadcast:
            h.mu.Lock()
            for client := range h.clients {
                err := client.WriteMessage(websocket.TextMessage, message)
                if err != nil {
                    log.Printf("error: %v", err)
                    client.Close()
                    delete(h.clients, client)
                }
            }
            h.mu.Unlock()
        }
    }
}

// This function simulates the SQS worker notifying us of an update.
func simulateUpdate(hub *Hub) {
    go func() {
        for {
            // In reality, this would be driven by an event from the worker.
            time.Sleep(5 * time.Second)
            update := DeviceStatus{
                DeviceID: "device-007",
                Temperature: 25.5 + (rand.Float64() * 5),
                LastUpdate: time.Now().Format(time.RFC3339),
            }
            payload, _ := json.Marshal(update)
            hub.broadcast <- payload
        }
    }()
}


func main() {
    hub := newHub()
    go hub.run()
    // simulateUpdate(hub) // Left commented out, as we'd use a real mechanism.
    
    router := gin.Default()
    router.Use(cors.Default()) // Use a more restrictive policy in production.

    // A simple REST endpoint to get the current state for initial load.
    router.GET("/api/v1/devices", func(c *gin.Context) {
        // Here, you would query the read model (e.g., the in-memory map or Redis).
        // For this example, return static data.
        c.JSON(http.StatusOK, []DeviceStatus{
            {DeviceID: "device-001", Temperature: 22.1, LastUpdate: time.Now().Add(-1 * time.Minute).Format(time.RFC3339)},
            {DeviceID: "device-002", Temperature: 24.3, LastUpdate: time.Now().Add(-2 * time.Minute).Format(time.RFC3339)},
        })
    })

    // The WebSocket endpoint.
    router.GET("/ws/updates", func(c *gin.Context) {
        conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
        if err != nil {
            log.Println(err)
            return
        }
        hub.register <- conn

        // Clean up connection when the client disconnects.
        defer func() {
            hub.unregister <- conn
        }()

        // Keep the connection alive. The broadcasting is handled by the hub.
        for {
            // You can optionally read messages from the client here if needed.
            if _, _, err := conn.ReadMessage(); err != nil {
                break
            }
        }
    })

    router.Run(":8081")
}

Finally, the Solid.js frontend subscribes to the WebSocket and reactively updates the UI.

File: solid-frontend/src/App.jsx

import { createSignal, createEffect, onCleanup, For } from 'solid-js';
import './App.css';

function App() {
  // Use a map for efficient updates by device ID.
  const [devices, setDevices] = createSignal({});
  const [connectionStatus, setConnectionStatus] = createSignal('Connecting...');

  createEffect(() => {
    // Fetch initial state
    fetch('http://localhost:8081/api/v1/devices')
      .then(res => res.json())
      .then(initialData => {
        const initialMap = {};
        for (const device of initialData) {
          initialMap[device.deviceId] = device;
        }
        setDevices(initialMap);
      });

    // Establish WebSocket connection
    const socket = new WebSocket('ws://localhost:8081/ws/updates');

    socket.onopen = () => {
      setConnectionStatus('Connected');
      console.log('WebSocket connection established.');
    };

    socket.onmessage = (event) => {
      try {
        const updatedDevice = JSON.parse(event.data);
        // This is where Solid's reactivity shines. We update the map,
        // and Solid will automatically update only the specific row
        // for this device in the UI.
        setDevices(prev => ({
          ...prev,
          [updatedDevice.deviceId]: updatedDevice
        }));
      } catch (e) {
        console.error('Failed to parse incoming message:', event.data);
      }
    };

    socket.onclose = () => {
      setConnectionStatus('Disconnected. Will attempt to reconnect...');
      console.log('WebSocket connection closed.');
      // Production apps would have a reconnect with backoff strategy here.
    };

    socket.onerror = (error) => {
      setConnectionStatus('Connection Error');
      console.error('WebSocket Error:', error);
    };

    // Cleanup when the component is unmounted
    onCleanup(() => {
      socket.close();
    });
  });

  // Derived signal to get devices as an array for rendering
  const deviceList = () => Object.values(devices());

  return (
    <div class="container">
      <h1>Real-Time Device Dashboard</h1>
      <p>Connection Status: <span class={connectionStatus().toLowerCase().split(' ')[0]}>{connectionStatus()}</span></p>
      <table>
        <thead>
          <tr>
            <th>Device ID</th>
            <th>Temperature (°C)</th>
            <th>Last Update</th>
          </tr>
        </thead>
        <tbody>
          <For each={deviceList()} fallback={<tr><td colspan="3">Loading devices...</td></tr>}>
            {(device) => (
              <tr>
                <td>{device.deviceId}</td>
                <td>{device.temperature.toFixed(2)}</td>
                <td>{new Date(device.lastUpdate).toLocaleTimeString()}</td>
              </tr>
            )}
          </For>
        </tbody>
      </table>
    </div>
  );
}

export default App;

The key limitation of this architecture is its eventual consistency. There will always be a lag, however small, between command submission and the UI update. For systems requiring transactional guarantees or strong read-after-write consistency, this model is unsuitable. Furthermore, while SQS standard queues are highly scalable, they do not guarantee message order. If the sequence of events for a single device is critical, an SQS FIFO queue (scoped by MessageGroupId to the device ID) must be used, which comes with its own throughput limits. The resilience of the system also hinges on robust error handling in the worker, including a well-configured Dead Letter Queue (DLQ) to capture and analyze messages that fail processing repeatedly. Future iterations would involve replacing the in-memory read model with a persistent, scalable store like Redis for fault tolerance and horizontal scaling of the query service.


  TOC