Implementing a Polyglot CQRS Pipeline with Cassandra Writes and Firestore Real-Time Projections


The operational dashboard for our global IoT sensor network was failing. The original architecture, a monolithic Node.js application backed by a single, heavily-normalized PostgreSQL database, couldn’t cope. Every sensor heartbeat, status update, and diagnostic event was funneled through the same API endpoints, hammering the same tables. Write contention was rampant, and the complex joins required to render the dashboard were timing out under load. We were facing a classic conflict: the write patterns (high-volume, append-only) were fundamentally at odds with the read patterns (real-time, aggregated views). The system needed a complete architectural rethink, not another round of query optimization.

Our initial whiteboard session led us directly to the Command Query Responsibility Segregation (CQRS) pattern. The concept was simple but powerful: separate the model used for writing data (the Command model) from the model used for reading data (the Query model). This decoupling would allow us to choose the perfect database technology for each job instead of compromising with one. The plan solidified into a polyglot persistence architecture: a high-throughput, write-optimized store for incoming commands, and a separate, read-optimized, real-time store to power the user interface.

Technology selection became a critical discussion. For the command side, we needed something that could ingest a massive, unrelenting stream of writes from tens of thousands of devices globally. Apache Cassandra was the obvious choice. Its masterless architecture, linear scalability, and tunable consistency were perfectly suited for an append-only event log. In a real-world project, this kind of write-heavy workload is precisely where Cassandra shines and relational databases falter.

For the query side, the primary requirement was real-time updates pushed to the client. We wanted our dashboard to react instantly to changes without inefficient client-side polling. Google’s Firestore, with its onSnapshot real-time listeners, was a perfect fit. It abstracts away the complexity of WebSockets or other push technologies and provides a simple, powerful API for building reactive user interfaces. Its document-based model was also ideal for storing denormalized views tailored specifically for our dashboard components.

To bridge these two disparate systems, we needed a reliable, asynchronous messaging layer. AWS Simple Notification Service (SNS) was selected for its simplicity and managed nature. It would act as the event bus, decoupling the command service that writes to Cassandra from the “projector” service that builds the read model in Firestore. This asynchronous coupling is the linchpin of the entire architecture, providing resilience and independent scalability for each component.

Finally, for the frontend itself, we chose Lit. Its lightweight, standards-based approach to web components allowed us to create small, self-contained, and highly reusable UI elements. Each component could encapsulate its own Firestore subscription logic, making the overall application cleaner and easier to maintain.

The final architecture looked like this:

graph TD
    subgraph Frontend
        A[Lit Component]
    end

    subgraph Backend Services
        B(Command Service - Node.js)
        C(Projector Service - AWS Lambda)
    end

    subgraph Data Stores
        D[Cassandra Cluster]
        E[Google Firestore]
    end

    subgraph Messaging
        F[AWS SNS Topic]
    end

    A -- Real-time Listener --> E
    subgraph "Write Path (Command)"
        B -- Writes event --> D
        B -- Publishes event --> F
    end

    subgraph "Read Path Projection (Query)"
        F -- Triggers --> C
        C -- Reads event, writes view --> E
    end

The Command Service: Ingesting Data into Cassandra

The command service is the single entry point for all incoming sensor data. Its only responsibilities are to validate the data, persist it as an immutable event in Cassandra, and then publish a corresponding event to SNS. We built this using Node.js and the cassandra-driver package.

First, establishing a robust connection to the Cassandra cluster is paramount. A common mistake is not properly configuring reconnection policies and load balancing.

// /lib/cassandra-client.js
const cassandra = require('cassandra-driver');
const { Long } = cassandra.types;

// In a production scenario, these would come from environment variables or a secret manager.
const CASSANDRA_CONTACT_POINTS = process.env.CASSANDRA_HOSTS.split(',');
const CASSANDRA_DATACENTER = process.env.CASSANDRA_DATACENTER;
const CASSANDRA_KEYSPACE = 'iot_events';

const authProvider = new cassandra.auth.PlainTextAuthProvider(
  process.env.CASSANDRA_USER,
  process.env.CASSANDRA_PASS
);

// A retry policy is crucial for transient network issues.
const retryPolicy = new cassandra.policies.retry.DefaultRetryPolicy();

const client = new cassandra.Client({
  contactPoints: CASSANDRA_CONTACT_POINTS,
  localDataCenter: CASSANDRA_DATACENTER,
  keyspace: CASSANDRA_KEYSPACE,
  authProvider: authProvider,
  policies: {
    retry: retryPolicy,
    loadBalancing: new cassandra.policies.loadBalancing.DCAwareRoundRobinPolicy(CASSANDRA_DATACENTER)
  },
  queryOptions: { 
    consistency: cassandra.types.consistencies.localQuorum,
    prepare: true 
  }
});

client.connect()
  .then(() => console.log('Successfully connected to Cassandra.'))
  .catch(err => {
    console.error('Cassandra connection error:', err);
    // In a real app, this should trigger a graceful shutdown.
    process.exit(1); 
  });

module.exports = client;

Our Cassandra table schema is optimized for our primary write pattern: recording events per device over time. The device_id is the partition key, ensuring data for a single device is co-located on a single node for efficient retrieval. event_timestamp is the clustering key, which orders the events chronologically within the partition.

-- Cassandra Schema (iot_events keyspace)
CREATE TABLE sensor_readings (
    device_id UUID,
    event_timestamp TIMESTAMP,
    event_id TIMEUUID,
    reading_type TEXT,
    numeric_value DOUBLE,
    payload MAP<TEXT, TEXT>,
    PRIMARY KEY (device_id, event_timestamp)
) WITH CLUSTERING ORDER BY (event_timestamp DESC);

The core logic of the command handler uses prepared statements for performance and security. After a successful write to Cassandra, it immediately publishes a message to SNS.

// /services/command-handler.js
const { v4: uuidv4, v1: timeuuid } = require('uuid');
const cassandraClient = require('../lib/cassandra-client');
const snsClient = require('../lib/sns-client'); // AWS SDK v3 SNS client

const INSERT_SENSOR_READING_CQL = `
  INSERT INTO sensor_readings (device_id, event_timestamp, event_id, reading_type, numeric_value, payload)
  VALUES (?, ?, ?, ?, ?, ?)
`;

const SNS_TOPIC_ARN = process.env.SNS_TOPIC_ARN;

async function processSensorReading(command) {
  const { deviceId, timestamp, type, value, metadata } = command;

  // 1. Validate the incoming command data (omitted for brevity)

  const eventId = timeuuid();
  const eventTimestamp = new Date(timestamp);
  
  const params = [deviceId, eventTimestamp, eventId, type, value, metadata];

  try {
    // 2. Persist the event to Cassandra
    // The query options from the client (localQuorum, prepare=true) are used here.
    await cassandraClient.execute(INSERT_SENSOR_READING_CQL, params, { prepare: true });
    console.log(`[CommandHandler] Successfully persisted event ${eventId} for device ${deviceId}`);

    // 3. Construct and publish the domain event to SNS
    const snsEventPayload = {
      eventId: eventId.toString(),
      deviceId: deviceId,
      eventType: 'SensorReadingReceived',
      timestamp: eventTimestamp.toISOString(),
      readingType: type,
      value: value,
      metadata: metadata || {}
    };

    const publishCommand = {
      TopicArn: SNS_TOPIC_ARN,
      Message: JSON.stringify(snsEventPayload),
      MessageAttributes: {
        'eventType': { DataType: 'String', StringValue: 'SensorReadingReceived' }
      }
    };

    await snsClient.publish(publishCommand);
    console.log(`[CommandHandler] Successfully published event ${eventId} to SNS.`);

    return { success: true, eventId: eventId.toString() };

  } catch (error) {
    // This is a critical failure point. We need robust error handling.
    console.error(`[CommandHandler] Failed to process reading for device ${deviceId}. Event ID: ${eventId}`, error);

    // The pitfall here is atomicity. The Cassandra write could succeed while the SNS publish fails.
    // In a simple system, you might rely on logging and manual reconciliation.
    // For higher guarantees, a transactional outbox pattern would be necessary, where the event
    // is written to a local 'outbox' table within the same Cassandra transaction (or batch)
    // and a separate process reliably sends it to SNS. For this implementation, we accept
    // the small risk and rely on monitoring and alerting.
    
    throw new Error('Failed to fully process sensor reading.');
  }
}

module.exports = { processSensorReading };

The Projector Service: Building the Read Model

The projector is an AWS Lambda function triggered by our SNS topic. Its sole purpose is to consume events and update the denormalized read model in Firestore. This separation is beautiful; the projector can be scaled, updated, or even completely rewritten without affecting the command service.

The Firestore data model is designed entirely around the needs of the UI. Instead of a normalized event log, we maintain a single document per device that holds the latest state and some aggregated data.

// Firestore Collection: 'device_dashboards'
// Document ID: {device_id}
{
  "deviceId": "...",
  "lastSeen": "2023-10-27T10:25:00Z", // Firestore Timestamp
  "lastReadingType": "temperature",
  "currentTemperature": 22.5,
  "currentHumidity": 45.1,
  "status": "online",
  "recentAlarms": [
    { "timestamp": "...", "message": "High temperature warning" }
  ]
}

The Lambda handler code needs to be idempotent. SNS provides at-least-once delivery, meaning our function could be invoked multiple times for the same event. We handle this by using the eventId from the message to ensure we don’t process the same event twice, although in this “last-write-wins” model for a dashboard, idempotency is less critical than in financial transactions.

// projector-lambda/index.js
const { Firestore, FieldValue } = require('@google-cloud/firestore');

const firestore = new Firestore();
const DASHBOARD_COLLECTION = 'device_dashboards';

exports.handler = async (event) => {
  for (const record of event.Records) {
    try {
      const snsMessage = JSON.parse(record.Sns.Message);
      const { eventId, deviceId, eventType, timestamp, readingType, value } = snsMessage;

      console.log(`[Projector] Processing event ${eventId} for device ${deviceId}`);

      if (eventType !== 'SensorReadingReceived') {
        console.warn(`[Projector] Skipping unknown event type: ${eventType}`);
        continue;
      }
      
      const deviceDocRef = firestore.collection(DASHBOARD_COLLECTION).doc(deviceId);

      // We use a Firestore transaction to ensure the read-modify-write is atomic.
      // This is best practice, though for a simple last-write-wins scenario like this,
      // a simple .set() with merge:true would also work.
      await firestore.runTransaction(async (transaction) => {
        // Idempotency check could be implemented here by storing last processed eventId.
        // For this dashboard use case, we'll proceed assuming reprocessing is safe.

        const updatePayload = {
          lastSeen: new Date(timestamp),
          lastReadingType: readingType,
          // Dynamically set the field based on the reading type
          [`current${capitalize(readingType)}`]: value,
          status: 'online', // Could be updated based on other logic
        };
        
        // Using { merge: true } creates the document if it doesn't exist, or updates it if it does.
        transaction.set(deviceDocRef, updatePayload, { merge: true });
      });

      console.log(`[Projector] Successfully updated dashboard for device ${deviceId}`);

    } catch (error) {
      console.error('[Projector] Error processing SNS message:', error);
      // A DLQ (Dead-Letter Queue) configured on the SNS subscription is crucial for production.
      // If this function consistently fails, the message will be moved to the DLQ for inspection.
      throw error; // Re-throwing tells Lambda the execution failed.
    }
  }
  return { status: 'success' };
};

function capitalize(s) {
  if (typeof s !== 'string') return '';
  return s.charAt(0).toUpperCase() + s.slice(1);
}

The Frontend: A Reactive Lit Component

This is where the magic of Firestore’s real-time capabilities pays off. Our Lit component subscribes directly to changes on a specific device document. Whenever the projector Lambda updates the document, Firestore pushes the new data to the client, and Lit’s reactive properties handle the DOM update automatically.

The code is clean and declarative. There is no setInterval, no polling logic, just a subscription and a reactive render.

// /components/device-dashboard-card.ts
import { LitElement, html, css } from 'lit';
import { customElement, property, state } from 'lit/decorators.js';
import {
  doc,
  onSnapshot,
  Firestore,
  Unsubscribe,
} from 'firebase/firestore';
import { getFirestoreInstance } from '../lib/firebase-init'; // Your firebase initialization logic

interface DeviceData {
  lastSeen?: { toDate: () => Date };
  currentTemperature?: number;
  currentHumidity?: number;
  status?: string;
}

@customElement('device-dashboard-card')
export class DeviceDashboardCard extends LitElement {
  @property({ type: String })
  deviceId = '';

  @state()
  private _deviceData: DeviceData | null = null;
  
  @state()
  private _isLoading = true;

  @state()
  private _error: string | null = null;

  private _firestore: Firestore | null = null;
  private _unsubscribe: Unsubscribe | null = null;

  static styles = css`
    /* Component styles omitted for brevity */
  `;

  connectedCallback() {
    super.connectedCallback();
    if (!this.deviceId) {
      this._error = "Device ID is required.";
      return;
    }
    this._firestore = getFirestoreInstance();
    this._subscribeToDeviceData();
  }

  disconnectedCallback() {
    super.disconnectedCallback();
    // This is absolutely critical to prevent memory leaks and unnecessary reads.
    // A common mistake is to forget to unsubscribe when the component is removed from the DOM.
    if (this._unsubscribe) {
      this._unsubscribe();
      console.log(`Unsubscribed from device: ${this.deviceId}`);
    }
  }

  private _subscribeToDeviceData() {
    if (!this._firestore) return;

    const docRef = doc(this._firestore, 'device_dashboards', this.deviceId);
    
    this._unsubscribe = onSnapshot(docRef, 
      (docSnap) => {
        if (docSnap.exists()) {
          this._deviceData = docSnap.data() as DeviceData;
          this._error = null;
        } else {
          this._error = `No data found for device ${this.deviceId}.`;
          this._deviceData = null;
        }
        this._isLoading = false;
      },
      (error) => {
        console.error("Firestore subscription error:", error);
        this._error = "Failed to load real-time data.";
        this._isLoading = false;
      }
    );
    console.log(`Subscribed to device: ${this.deviceId}`);
  }

  render() {
    if (this._isLoading) {
      return html`<div class="card">Loading...</div>`;
    }
    if (this._error) {
      return html`<div class="card error">${this._error}</div>`;
    }
    if (!this._deviceData) {
      return html`<div class="card">No data available.</div>`;
    }

    const { lastSeen, currentTemperature, currentHumidity, status } = this._deviceData;
    const lastSeenDate = lastSeen?.toDate().toLocaleString() ?? 'N/A';
    
    return html`
      <div class="card status-${status}">
        <h3>Device: ${this.deviceId.substring(0, 8)}...</h3>
        <p>Status: <span class="status">${status}</span></p>
        <p>Temperature: <strong>${currentTemperature?.toFixed(1) ?? 'N/A'} °C</strong></p>
        <p>Humidity: <strong>${currentHumidity?.toFixed(1) ?? 'N/A'} %</strong></p>
        <p class="footer">Last seen: ${lastSeenDate}</p>
      </div>
    `;
  }
}

This architecture successfully solved our initial problem. Write throughput is now limited only by Cassandra’s scaling capacity, which is immense. The dashboard is snappy and updates in near real-time, typically within a few hundred milliseconds of a sensor event being persisted. The services are decoupled, independently deployable, and can be scaled based on their specific needs—the command service on write TPS, the projector on event volume, and Firestore on concurrent UI connections.

However, the solution is not without its own set of complexities and trade-offs. The primary one is the embrace of eventual consistency. There is a tangible, albeit short, delay between a write succeeding in Cassandra and the result appearing on the UI. For our dashboard, this is perfectly acceptable. For a system requiring read-your-writes consistency, this architecture would be unsuitable. Furthermore, the operational burden has increased. We now have three distinct data systems to monitor and maintain, and debugging a single user-facing issue can involve tracing a request across five different components. Future work will focus on implementing distributed tracing using OpenTelemetry to provide visibility across this entire pipeline. We are also exploring adding a second projector that feeds events into a data warehouse like BigQuery for long-term analytical queries, a task for which Firestore is not well-suited.


  TOC