Architecting a Resilient Edge-to-Cloud Pipeline with C++, CockroachDB Changefeeds, and Vue.js


Our initial attempt at aggregating sensor data from globally distributed industrial sites was a catastrophic failure. We had a standard monolithic application with a PostgreSQL database running in a single AWS region. Edge devices, running a simple Python script, would push JSON payloads every second. The system collapsed under the combined weight of network latency for our sites in Asia and Europe, constant connection drops from unreliable factory networks, and the sheer write load on a single-node database. It wasn’t a scaling problem; it was an architectural mismatch for a geographically distributed reality.

The post-mortem forced a fundamental rethink. The new architecture needed to embrace distribution and unreliability as first-class citizens. This led us to a tripartite solution: a high-performance, resilient C++ client for the edge; a geo-distributed SQL database in the cloud; and a reactive, real-time monitoring dashboard.

Our technology selection was unconventional but driven by strict, real-world requirements.

  1. Edge Client: C++. The industrial controllers our software runs on are resource-constrained. We needed minimal CPU and memory overhead, predictable performance without garbage collection pauses, and direct control over network behavior and local storage. C++ was the obvious, if challenging, choice. It allowed us to build a lean, robust binary with a self-contained data buffering mechanism.
  2. Cloud Database: CockroachDB. The single biggest pain point was data gravity and latency. CockroachDB’s geo-partitioning was the killer feature. It allows a single logical database to span multiple regions, keeping data physically close to its source. A sensor in Frankfurt writes to a CockroachDB node in eu-central-1, and a sensor in Tokyo writes to a node in ap-northeast-1. This drastically reduces write latency and helps with data sovereignty regulations. Its built-in resilience and Postgres wire compatibility were critical bonuses.
  3. Monitoring UI: Vue.js. We needed a lightweight, modern frontend to display a live feed of system-wide alerts and sensor readings. Vue.js offered the right balance of simplicity and power. The real challenge was feeding it real-time data from CockroachDB without resorting to constant polling. This is where CockroachDB Changefeeds became the lynchpin of the entire monitoring architecture.

The core of the system can be visualized as a data flow across these three components.

graph TD
    subgraph Edge Site [Industrial Site - Frankfurt]
        C1[C++ Client on PLC] -- Ingests data --> B1[Local SQLite Buffer]
    end

    subgraph Edge Site 2 [Industrial Site - Tokyo]
        C2[C++ Client on PLC] -- Ingests data --> B2[Local SQLite Buffer]
    end

    subgraph Cloud [AWS]
        subgraph VPC_EU [eu-central-1]
            CRDB_EU[CockroachDB Node]
            API_EU[Ingestion API Instance]
        end
        subgraph VPC_AP [ap-northeast-1]
            CRDB_AP[CockroachDB Node]
            API_AP[Ingestion API Instance]
        end
        subgraph VPC_US [us-east-1]
            CRDB_US[CockroachDB Node]
            SSE_Bridge[Changefeed SSE Bridge]
            Dashboard[Vue.js Application]
        end
    end

    B1 -- Resilient POST --> API_EU
    B2 -- Resilient POST --> API_AP
    API_EU -- SQL INSERT --> CRDB_EU
    API_AP -- SQL INSERT --> CRDB_AP

    CRDB_EU <--> CRDB_AP
    CRDB_AP <--> CRDB_US
    CRDB_EU <--> CRDB_US

    CRDB_US -- Changefeed --> SSE_Bridge
    SSE_Bridge -- Server-Sent Events --> Dashboard

    style Edge Site fill:#f9f,stroke:#333,stroke-width:2px
    style Edge Site 2 fill:#f9f,stroke:#333,stroke-width:2px

The C++ Edge Client: Built for Failure

The primary responsibility of the edge client is to capture sensor data, batch it, and reliably forward it to the cloud API, surviving network outages that could last hours. A “fire and forget” approach was not an option.

We used SQLite as a local, persistent write-ahead log. New readings are written to a local database file first. A separate worker thread is responsible for reading from this buffer, sending the data to the cloud, and only deleting the local copy upon a successful 200 OK response.

Here’s a condensed but functional representation of the core forwarding logic. This requires libraries like libcurl for HTTP and sqlite3.

CMakeLists.txt:

cmake_minimum_required(VERSION 3.15)
project(EdgeClient CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_executable(edge_client main.cpp http_forwarder.cpp data_buffer.cpp)

find_package(CURL REQUIRED)
find_package(Threads REQUIRED)
find_package(sqlite3 REQUIRED)

target_link_libraries(edge_client
    PRIVATE
    CURL::libcurl
    Threads::Threads
    sqlite3
)

data_buffer.h:

#pragma once

#include <string>
#include <vector>
#include <optional>

struct SensorReading {
    long id;
    std::string device_id;
    std::string payload_json;
    long long timestamp;
};

class DataBuffer {
public:
    DataBuffer(const std::string& db_path);
    ~DataBuffer();

    bool open();
    void close();
    bool add_reading(const std::string& device_id, const std::string& json_payload);
    std::optional<std::vector<SensorReading>> get_batch(int limit);
    bool delete_batch(const std::vector<long>& ids);

private:
    void initialize_schema();
    std::string db_path_;
    void* db_handle_ = nullptr; // Opaque pointer to sqlite3
};

http_forwarder.cpp (Core Logic):

#include "http_forwarder.h"
#include "data_buffer.h"
#include <iostream>
#include <thread>
#include <chrono>
#include <curl/curl.h>
#include <nlohmann/json.hpp> // For convenience

// A real implementation needs better logging (e.g., spdlog)
#define LOG_INFO(msg) std::cout << "[INFO] " << msg << std::endl
#define LOG_ERROR(msg) std::cerr << "[ERROR] " << msg << std::endl

HttpForwarder::HttpForwarder(const std::string& endpoint, DataBuffer& buffer)
    : api_endpoint_(endpoint), buffer_(buffer), running_(false) {}

HttpForwarder::~HttpForwarder() {
    stop();
}

void HttpForwarder::start() {
    if (running_) {
        return;
    }
    running_ = true;
    worker_thread_ = std::thread(&HttpForwarder::run, this);
}

void HttpForwarder::stop() {
    running_ = false;
    if (worker_thread_.joinable()) {
        worker_thread_.join();
    }
}

// libcurl write callback function
static size_t write_callback(void* contents, size_t size, size_t nmemb, void* userp) {
    // In our case, we don't care about the response body for a 200 OK
    // but a production system might parse it for server-side instructions.
    return size * nmemb;
}


void HttpForwarder::run() {
    CURL* curl = curl_easy_init();
    if (!curl) {
        LOG_ERROR("Failed to initialize libcurl.");
        return;
    }

    long backoff_ms = 1000; // Initial backoff delay

    while (running_) {
        auto batch = buffer_.get_batch(50); // Get up to 50 readings

        if (!batch || batch->empty()) {
            std::this_thread::sleep_for(std::chrono::seconds(5));
            continue;
        }

        // Prepare JSON payload for the batch
        nlohmann::json batch_json = nlohmann::json::array();
        std::vector<long> ids_in_batch;
        for (const auto& reading : *batch) {
            nlohmann::json reading_json;
            reading_json["device_id"] = reading.device_id;
            reading_json["ts"] = reading.timestamp;
            reading_json["data"] = nlohmann::json::parse(reading.payload_json);
            batch_json.push_back(reading_json);
            ids_in_batch.push_back(reading.id);
        }
        std::string payload_str = batch_json.dump();

        // Configure libcurl request
        struct curl_slist* headers = NULL;
        headers = curl_slist_append(headers, "Content-Type: application/json");
        // In a real project, add an API key or JWT token here
        // headers = curl_slist_append(headers, "Authorization: Bearer ...");

        curl_easy_setopt(curl, CURLOPT_URL, api_endpoint_.c_str());
        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload_str.c_str());
        curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback);
        curl_easy_setopt(curl, CURLOPT_TIMEOUT, 15L); // 15 second timeout

        CURLcode res = curl_easy_perform(curl);
        long http_code = 0;
        if (res == CURLE_OK) {
            curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
        }

        curl_slist_free_all(headers);

        if (res == CURLE_OK && http_code >= 200 && http_code < 300) {
            LOG_INFO("Successfully forwarded batch of " + std::to_string(ids_in_batch.size()) + " readings.");
            buffer_.delete_batch(ids_in_batch);
            backoff_ms = 1000; // Reset backoff on success
        } else {
            LOG_ERROR("Failed to forward batch. Curl error: " + std::string(curl_easy_strerror(res)) + ", HTTP code: " + std::to_string(http_code));
            // Exponential backoff with jitter
            std::this_thread::sleep_for(std::chrono::milliseconds(backoff_ms));
            backoff_ms = std::min(backoff_ms * 2, 60000L); // Cap at 60 seconds
        }
    }

    curl_easy_cleanup(curl);
}

The key pitfall to avoid here is blocking the main data acquisition loop on network I/O. By decoupling acquisition and forwarding with a persistent buffer, the system remains operational locally even during complete cloud disconnection.

CockroachDB: Geo-partitioning and Real-time Feeds

The cloud backend is where the magic of CockroachDB comes into play. Our primary table stores the sensor readings. We needed to ensure that data from European devices physically resided on European servers. This is achieved with REGIONAL BY ROW tables.

First, we define the database regions. This is done once per database.

-- Connect to the CockroachDB cluster
-- The primary region is where non-regional metadata is stored
ALTER DATABASE industrial_iot PRIMARY REGION "us-east-1";
ALTER DATABASE industrial_iot ADD REGION "eu-central-1";
ALTER DATABASE industrial_iot ADD REGION "ap-northeast-1";

Next, the table schema. We add a hidden crdb_region column that CockroachDB uses to automatically route rows to the correct partition.

CREATE TABLE sensor_readings (
    device_id STRING NOT NULL,
    ts TIMESTAMPTZ NOT NULL,
    region crdb_internal_region AS (
        CASE
            WHEN device_id LIKE 'eu-%' THEN 'eu-central-1'
            WHEN device_id LIKE 'ap-%' THEN 'ap-northeast-1'
            ELSE 'us-east-1'
        END
    ) STORED,
    data JSONB,
    PRIMARY KEY (region, device_id, ts)
)
LOCALITY REGIONAL BY ROW;

A common mistake is to create a poor primary key for geo-partitioned tables. The region column must be the first column in the primary key. This allows the database to immediately identify which partition a row belongs to without scanning others. We use a composite key of (region, device_id, ts) to ensure data is efficiently colocated for queries that filter by device. The device IDs are prefixed with their region (e.g., eu-fr-unit-0815) to make the CASE statement work.

With this schema, an INSERT from an API server running in eu-central-1 for a device eu-de-temp-123 is a low-latency, local write. The cluster handles replicating this data to other regions for resilience, but the synchronous commit path is optimized.

The second critical piece is getting this data to our dashboard in real-time. We use Change Data Capture (CDC), or Changefeeds. We create a changefeed that pushes any new row inserted into sensor_readings to a webhook endpoint.

CREATE CHANGEFEED FOR TABLE sensor_readings
INTO 'webhook-http://sse-bridge-service:8080/ingest'
WITH
    format = json,
    updated,
    resolved = '10s';

This command tells CockroachDB to emit a JSON payload for each new row to our internal sse-bridge-service. The resolved option provides a watermark, guaranteeing that all events up to a certain timestamp have been delivered, which is essential for downstream consumers that require ordering guarantees.

The Vue.js Dashboard via an SSE Bridge

Directly connecting a web browser to a database changefeed is not feasible or secure. We need a simple service in the middle: the SSE Bridge. This is a small Node.js/Express application whose only job is to receive webhook POSTs from the CockroachDB changefeed and stream them out to connected browser clients using Server-Sent Events (SSE). SSE is a much simpler protocol than WebSockets for this unidirectional server-to-client data flow.

sse-bridge/index.js:

const express = require('express');
const bodyParser = require('body-parser');
const cors = require('cors');

const app = express();
const PORT = process.env.PORT || 8080;

let clients = [];

// Middleware for parsing CockroachDB changefeed JSON payloads
// CockroachDB sends a JSON array of events in the request body
app.use(bodyParser.json({ limit: '50mb' }));
app.use(cors());

// The endpoint that CockroachDB sends changefeed data to
app.post('/ingest', (req, res) => {
    const payload = req.body.payload;
    if (payload && Array.isArray(payload)) {
        console.log(`Received ${payload.length} events from CockroachDB changefeed.`);
        // Relay each event to all connected SSE clients
        payload.forEach(event => {
            sendEventToClients(JSON.stringify(event));
        });
    }
    res.status(200).send();
});

// The endpoint that the Vue.js frontend connects to
app.get('/events', (req, res) => {
    // Standard SSE headers
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // Flush headers to establish connection

    const clientId = Date.now();
    const newClient = { id: clientId, res };
    clients.push(newClient);
    console.log(`Client ${clientId} connected.`);

    // Keep the connection open
    req.on('close', () => {
        console.log(`Client ${clientId} disconnected.`);
        clients = clients.filter(client => client.id !== clientId);
    });
});

function sendEventToClients(eventData) {
    // SSE format is 'data: {json_string}\n\n'
    clients.forEach(client => client.res.write(`data: ${eventData}\n\n`));
}

app.listen(PORT, () => {
    console.log(`SSE Bridge for CockroachDB Changefeeds running on port ${PORT}`);
});

Finally, the Vue.js component consumes this stream. We use the native EventSource browser API.

src/components/RealTimeFeed.vue:

<template>
  <div class="feed-container">
    <h2>Live Sensor Readings</h2>
    <div v-if="error" class="error-banner">
      Connection to event stream failed. Retrying...
    </div>
    <transition-group name="list" tag="ul" class="reading-list">
      <li v-for="reading in latestReadings" :key="reading.key" class="reading-item">
        <span class="timestamp">{{ formatTimestamp(reading.ts.__crdb_ts) }}</span>
        <span class="region" :class="getRegionClass(reading.region)">{{ reading.region }}</span>
        <span class="device">{{ reading.device_id }}</span>
        <pre class="payload">{{ reading.data }}</pre>
      </li>
    </transition-group>
  </div>
</template>

<script setup>
import { ref, onMounted, onUnmounted } from 'vue';

const latestReadings = ref([]);
const error = ref(false);
let eventSource = null;

const MAX_READINGS = 50; // Keep only the latest 50 readings in the UI

const connectToEventStream = () => {
  // A real-world project must get this URL from configuration
  const SSE_ENDPOINT = 'http://localhost:8080/events';

  eventSource = new EventSource(SSE_ENDPOINT);

  eventSource.onopen = () => {
    console.log("Connection to SSE stream opened.");
    error.value = false;
  };

  eventSource.onmessage = (event) => {
    try {
      const newReading = JSON.parse(event.data);

      // The key from the changefeed payload can be used for Vue's :key
      // It's a stable identifier for the event.
      newReading.key = newReading.__crdb__.key;
      
      latestReadings.value.unshift(newReading);

      // Maintain a fixed-size list to prevent the DOM from growing indefinitely
      if (latestReadings.value.length > MAX_READINGS) {
        latestReadings.value.pop();
      }
    } catch (e) {
      console.error("Failed to parse incoming event data:", e);
    }
  };

  eventSource.onerror = () => {
    console.error("EventSource failed.");
    error.value = true;
    eventSource.close();
    // Simple retry mechanism
    setTimeout(connectToEventStream, 5000);
  };
};

onMounted(() => {
  connectToEventStream();
});

onUnmounted(() => {
  if (eventSource) {
    eventSource.close();
    console.log("Connection to SSE stream closed.");
  }
});

// Helper functions for display
const formatTimestamp = (crdbTs) => {
  const [seconds] = crdbTs.split('.');
  return new Date(parseInt(seconds, 10) * 1000).toISOString();
};

const getRegionClass = (region) => `region-${region.split('-')[0]}`;

</script>

<style scoped>
/* Basic styling for demonstration */
.feed-container { font-family: monospace; }
.reading-list { list-style: none; padding: 0; }
.reading-item { background: #2d2d2d; border-radius: 4px; padding: 8px 12px; margin-bottom: 8px; display: flex; align-items: center; gap: 16px; }
.timestamp { color: #888; }
.region { padding: 2px 6px; border-radius: 3px; font-weight: bold; }
.region-eu { background: #007acc; color: white; }
.region-ap { background: #d9534f; color: white; }
.region-us { background: #5cb85c; color: white; }
.device { color: #eee; }
.payload { margin: 0; background: #3c3c3c; padding: 4px; border-radius: 3px; color: #ccc; font-size: 0.9em; }

/* Animation for new items */
.list-enter-active { transition: all 0.5s ease; }
.list-enter-from { opacity: 0; transform: translateX(-30px); }
</style>

The critical part of the Vue component is its lifecycle management. In onMounted, it establishes the EventSource connection. In onUnmounted, it explicitly calls eventSource.close() to prevent memory leaks and dangling network connections. A production system should also implement a more robust retry and error-handling strategy than the simple setTimeout shown here.

This architecture, while composed of disparate technologies, forms a cohesive and remarkably resilient system. The C++ client ensures data is never lost at the edge. CockroachDB provides a scalable, geographically aware, and consistent storage layer. And the combination of Changefeeds, an SSE bridge, and Vue.js delivers a real-time view into the entire distributed system with minimal complexity.

The system is not without its limitations. The SSE bridge itself is a potential single point of failure and would need to be deployed as a horizontally scalable, highly available service in a production environment. The security model is also nascent; proper mTLS between the C++ client and the ingestion API, as well as fine-grained access control within CockroachDB, are necessary next steps. Furthermore, for extremely high-throughput devices, the C++ client’s batching could be made more sophisticated, dynamically adjusting batch size based on network conditions and API responses. Data lifecycle management, using CockroachDB’s row-level TTL, is also a required future iteration to automatically prune historical data.


  TOC