Constructing a Resilient Multi-Cloud Data Forwarder with Node.js SQLite and Pub Sub


A regional network partition affecting a major cloud provider recently led to several hours of data loss for one of our critical telemetry pipelines. The source system, an Azure-hosted application, was unable to push events to our primary analytics platform in Google Cloud. The in-memory buffering strategy of the existing forwarder proved useless once the process was restarted during the incident response. This event forced a fundamental redesign, with the primary directive being zero data loss, even in the face of prolonged downstream or network unavailability. The result was a lightweight, durable data forwarder built on a specific combination of technologies chosen for resilience and operational simplicity.

The initial concept was to build a standalone agent that implemented the Transactional Outbox pattern. This agent would sit between the Azure data source and the Google Cloud Pub/Sub topic, acting as a durable, persistent buffer. Its sole responsibility would be to accept data, write it to a local transactional store, and then independently attempt to forward it to the final destination.

Technology selection was driven by the core requirement of durability with minimal operational overhead.

  • Node.js: The task is I/O-bound, not CPU-bound, making Node.js’s event-driven, non-blocking model a natural fit. Its ecosystem and lightweight runtime were ideal for an agent that might be deployed in constrained environments like a small VM or even a container on an edge device.
  • Google Cloud Pub/Sub: It serves as the scalable, reliable entry point to our GCP infrastructure. Its push/pull subscription model and built-in retry mechanisms make it a robust sink for our data pipeline.
  • SQLite: This was the most critical and debated choice. The knee-jerk reaction was to propose Redis or even a managed queue service. However, in a real-world project, adding a network dependency like Redis to solve a network-unavailability problem is counter-intuitive. It introduces another point of failure. A full-blown database like Postgres or MySQL was operational overkill for a simple message buffer. SQLite, running as a local file, offered full ACID transactional guarantees with zero setup, no separate server process, and no network latency. It is the perfect embodiment of a durable, local-first buffer. A common mistake is to underestimate SQLite; for single-writer scenarios like this outbox, its performance is more than sufficient, and its reliability is production-grade.
  • Azure: The data originates here. For this implementation, we’ll model the source as an Azure Function with an HTTP trigger, which is a common integration pattern.

The architecture is straightforward. The data flow is designed for complete decoupling.

graph TD
    A[Azure App / Function] -- HTTP POST --> B{Node.js Forwarder Agent};
    B -- 1. INSERT within transaction --> C[(SQLite Outbox DB)];
    subgraph "Forwarder Process (Independent Loop)"
        D[Worker] -- 2. SELECT PENDING msgs --> C;
        D -- 3. Publish batch --> E[GCP Pub/Sub Topic];
        E -- 4. Acknowledge --> D;
        D -- 5. UPDATE msgs to SENT --> C;
    end
    E -- Pub/Sub Push --> F[Downstream GCP Services];

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#bbf,stroke:#333,stroke-width:2px

The forwarder consists of two primary components running in the same Node.js process: an API server to ingest data and a background worker to process the outbox.

Phase 1: The Durable SQLite Outbox

The foundation is the database schema. It needs to store the payload and track the state of each message. We use better-sqlite3, a synchronous library that simplifies database interactions in a single-threaded worker model and avoids callback complexity.

The database initialization script:

// src/database/schema.js
const Database = require('better-sqlite3');
const path = require('path');
const fs = require('fs');

const dbPath = process.env.DB_PATH || path.join(__dirname, '../../data/outbox.db');
const dbDir = path.dirname(dbPath);

if (!fs.existsSync(dbDir)) {
    fs.mkdirSync(dbDir, { recursive: true });
}

const db = new Database(dbPath, { verbose: console.log });

function initializeDatabase() {
    console.log('Initializing database schema...');
    const createTableStmt = `
        CREATE TABLE IF NOT EXISTS outbox (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            message_id TEXT NOT NULL UNIQUE,
            payload TEXT NOT NULL,
            status TEXT NOT NULL DEFAULT 'PENDING',
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            attempts INTEGER NOT NULL DEFAULT 0,
            last_attempt_at TIMESTAMP NULL
        );
    `;
    db.exec(createTableStmt);

    // Indexes are critical for performance as the table grows.
    db.exec('CREATE INDEX IF NOT EXISTS idx_outbox_status_attempts ON outbox (status, attempts);');
    db.exec('CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);');
    console.log('Database schema initialized.');
}

initializeDatabase();

module.exports = db;

The core data access layer uses prepared statements for performance and security. The key function is insertMessage, which wraps the insertion in a transaction. This ensures that a message is either fully committed to the outbox or not at all.

// src/database/outbox-repository.js
const db = require('./schema');
const { v4: uuidv4 } = require('uuid');

// Using prepared statements is a non-negotiable best practice.
const insertStmt = db.prepare(
    'INSERT INTO outbox (message_id, payload, status) VALUES (@message_id, @payload, @status)'
);
const getPendingStmt = db.prepare(
    `SELECT id, payload, attempts FROM outbox WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT @limit`
);
const markAsSentStmt = db.prepare(
    `UPDATE outbox SET status = 'SENT', updated_at = CURRENT_TIMESTAMP WHERE id = ?`
);
const incrementAttemptStmt = db.prepare(
    `UPDATE outbox SET attempts = attempts + 1, last_attempt_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ?`
);

/**
 * Inserts a new message into the outbox within a transaction.
 * @param {object} payload - The message payload.
 * @returns {string} The unique message ID.
 */
function insertMessage(payload) {
    const messageId = uuidv4();
    try {
        const jsonPayload = JSON.stringify(payload);
        // The transaction ensures that if the process dies mid-operation, we don't end up with a corrupt entry.
        db.transaction(() => {
            insertStmt.run({
                message_id: messageId,
                payload: jsonPayload,
                status: 'PENDING',
            });
        })();
        return messageId;
    } catch (error) {
        // A common mistake is to not have detailed logging here.
        // Knowing if the failure is due to serialization or DB constraints is crucial.
        console.error(`[Repository] Failed to insert message: ${error.message}`, { payload });
        throw new Error('Failed to persist message to outbox.');
    }
}

/**
 * Fetches a batch of pending messages.
 * @param {number} limit - The maximum number of messages to fetch.
 * @returns {Array<object>}
 */
function getPendingMessages(limit) {
    return getPendingStmt.all({ limit });
}

/**
 * Marks a message as sent.
 * @param {number} id - The database row ID.
 */
function markMessageAsSent(id) {
    markAsSentStmt.run(id);
}

/**
 * Increments the attempt count for a message.
 * @param {number} id - The database row ID.
 */
function incrementMessageAttempt(id) {
    incrementAttemptStmt.run(id);
}

module.exports = {
    insertMessage,
    getPendingMessages,
    markMessageAsSent,
    incrementMessageAttempt,
};

Phase 2: The GCP Pub/Sub Publisher

The publisher component is responsible for the actual communication with Google Cloud. Error handling here is paramount. We need to distinguish between recoverable errors (e.g., network timeout) and non-recoverable errors (e.g., malformed message, invalid credentials).

// src/gcp/publisher.js
const { PubSub } = require('@google-cloud/pubsub');
const pino = require('pino');

const logger = pino({ level: process.env.LOG_LEVEL || 'info' });

// Configuration should always be externalized.
const pubsubConfig = {
    projectId: process.env.GCP_PROJECT_ID,
    // In a production environment, you would use workload identity federation or service account keys.
    // For local dev, `gcloud auth application-default login` is sufficient.
};

const topicName = process.env.GCP_PUBSUB_TOPIC;

if (!pubsubConfig.projectId || !topicName) {
    throw new Error('GCP_PROJECT_ID and GCP_PUBSUB_TOPIC environment variables are required.');
}

const pubSubClient = new PubSub(pubsubConfig);

/**
 * Publishes a batch of messages to a Pub/Sub topic.
 * @param {Array<{id: number, payload: string, attempts: number}>} messages - Batch of messages from the outbox.
 * @returns {Promise<{successful: Array<number>, failed: Array<number>}>} - A map of successful and failed database IDs.
 */
async function publishBatch(messages) {
    if (messages.length === 0) {
        return { successful: [], failed: [] };
    }

    const topic = pubSubClient.topic(topicName);
    const successful = [];
    const failed = [];

    const publishPromises = messages.map(async (message) => {
        try {
            const dataBuffer = Buffer.from(message.payload);
            // The messageId can be used for downstream deduplication if needed.
            await topic.publishMessage({ data: dataBuffer });
            logger.info(`Successfully published message ID: ${message.id}`);
            successful.push(message.id);
        } catch (error) {
            // Here we must be specific about error handling.
            // Is it a transient network error or a permanent one?
            // The gRPC client library handles some retries, but we need our own logic.
            logger.error(`Failed to publish message ID: ${message.id}. Attempt: ${message.attempts + 1}. Error: ${error.message}`);
            failed.push(message.id);
        }
    });

    await Promise.all(publishPromises);
    return { successful, failed };
}

module.exports = { publishBatch };

Phase 3: The Forwarder Worker Loop

This is the engine of the agent. It runs on a timer, continuously checking the outbox for pending messages and attempting to forward them. The implementation includes a simple exponential backoff logic managed by the attempts count in the database.

// src/worker/forwarder.js
const pino = require('pino');
const { getPendingMessages, markMessageAsSent, incrementMessageAttempt } = require('../database/outbox-repository');
const { publishBatch } = require('../gcp/publisher');

const logger = pino({ level: process.env.LOG_LEVEL || 'info' });

const config = {
    pollIntervalMs: parseInt(process.env.POLL_INTERVAL_MS, 10) || 5000,
    batchSize: parseInt(process.env.BATCH_SIZE, 10) || 50,
    maxAttempts: parseInt(process.env.MAX_ATTEMPTS, 10) || 10,
};

let isRunning = false;
let intervalId = null;

async function processOutbox() {
    if (isRunning) {
        logger.warn('Previous outbox processing cycle is still running. Skipping.');
        return;
    }

    isRunning = true;
    logger.info('Starting outbox processing cycle...');

    try {
        const messages = getPendingMessages(config.batchSize);

        if (messages.length === 0) {
            logger.info('No pending messages in outbox.');
            return;
        }

        logger.info(`Found ${messages.length} pending messages to process.`);

        const messagesToProcess = messages.filter(m => m.attempts < config.maxAttempts);
        const deadLetterMessages = messages.filter(m => m.attempts >= config.maxAttempts);

        // Handle messages that have exceeded max attempts.
        // In a real project, these should be moved to a dead-letter table for manual inspection.
        if (deadLetterMessages.length > 0) {
            logger.error(`Found ${deadLetterMessages.length} messages that exceeded max attempts.`, {
                ids: deadLetterMessages.map(m => m.id)
            });
            // For now, we'll just log them. A robust implementation would move them.
        }

        const { successful, failed } = await publishBatch(messagesToProcess);

        // Update the database based on the outcome. This must be robust.
        successful.forEach(id => markMessageAsSent(id));
        failed.forEach(id => incrementMessageAttempt(id));

        logger.info(`Processing cycle complete. Successful: ${successful.length}, Failed: ${failed.length}`);

    } catch (error) {
        logger.error(`An unexpected error occurred during the outbox processing cycle: ${error.message}`, { stack: error.stack });
    } finally {
        isRunning = false;
    }
}

function start() {
    if (intervalId) {
        logger.warn('Forwarder worker is already started.');
        return;
    }
    logger.info(`Starting forwarder worker with poll interval ${config.pollIntervalMs}ms`);
    intervalId = setInterval(processOutbox, config.pollIntervalMs);
}

function stop() {
    if (intervalId) {
        logger.info('Stopping forwarder worker...');
        clearInterval(intervalId);
        intervalId = null;
    }
}

module.exports = { start, stop };

The logic to filter by maxAttempts is a primitive dead-letter queue. A production-grade system would move these failed messages to a separate outbox_dead_letters table to prevent them from being queried repeatedly.

Phase 4: The Ingestion Endpoint

This is the public-facing part of the agent. It’s a simple Express.js server that exposes one endpoint. Its only job is to receive data, perform minimal validation, and hand it off to the outbox-repository. It must respond quickly to the client (the Azure Function) to signal that the data has been durably persisted.

// src/api/server.js
const express = require('express');
const pino = require('pino');
const pinoHttp = require('pino-http');
const { insertMessage } = require('../database/outbox-repository');

const logger = pino({ level: process.env.LOG_LEVEL || 'info' });
const httpLogger = pinoHttp({ logger });

const app = express();
app.use(express.json());
app.use(httpLogger);

app.post('/ingest', (req, res) => {
    const payload = req.body;

    // Basic validation is essential. Never trust incoming data.
    if (!payload || typeof payload !== 'object' || Object.keys(payload).length === 0) {
        return res.status(400).json({ error: 'Invalid payload. Body must be a non-empty JSON object.' });
    }

    try {
        const messageId = insertMessage(payload);
        // The 202 Accepted status is appropriate here.
        // We are acknowledging receipt of the data, not that it has been fully processed and forwarded.
        res.status(202).json({ status: 'queued', message_id: messageId });
    } catch (error) {
        // This indicates a problem with our local persistence layer, which is a critical failure.
        logger.error(`Critical error during ingestion: ${error.message}`, { payload });
        res.status(500).json({ error: 'Internal server error while queueing message.' });
    }
});

// A health check endpoint is a must-have for any service.
app.get('/health', (req, res) => {
    res.status(200).json({ status: 'ok', timestamp: new Date().toISOString() });
});

function start(port) {
    return new Promise((resolve) => {
        const server = app.listen(port, () => {
            logger.info(`Ingestion API server listening on port ${port}`);
            resolve(server);
        });
    });
}

module.exports = { start };

To wire it all together, the main application entry point initializes and starts both the API server and the background worker.

// index.js
require('dotenv').config();
const apiServer = require('./src/api/server');
const forwarderWorker = require('./src/worker/forwarder');
const pino = require('pino');

const logger = pino({ level: process.env.LOG_LEVEL || 'info' });
const PORT = process.env.PORT || 3000;

async function main() {
    logger.info('Starting multi-cloud forwarder agent...');

    // Start the API server to accept incoming data
    const server = await apiServer.start(PORT);

    // Start the background worker to process the outbox
    forwarderWorker.start();

    const shutdown = (signal) => {
        logger.warn(`Received ${signal}. Shutting down gracefully.`);
        
        forwarderWorker.stop();
        
        server.close(() => {
            logger.info('API server closed.');
            // In a real application, you'd also close the database connection.
            process.exit(0);
        });
        
        // Force shutdown after a timeout
        setTimeout(() => {
            logger.error('Could not close connections in time, forcing shutdown.');
            process.exit(1);
        }, 10000); // 10 seconds
    };

    process.on('SIGTERM', () => shutdown('SIGTERM'));
    process.on('SIGINT', () => shutdown('SIGINT'));
}

main().catch(error => {
    logger.fatal(`Failed to start application: ${error.message}`, { stack: error.stack });
    process.exit(1);
});

To test the resilience, one can run this agent and then use iptables or a firewall rule to block all outbound traffic to *.googleapis.com. The API server at /ingest will continue to accept data and queue it in outbox.db. Once the network block is removed, the forwarder worker will automatically catch up, processing the backlog of messages from the SQLite database in the correct order. The data from the outage window is preserved.

This solution is not without its limitations. The agent itself is a single point of failure. Running multiple instances would require a distributed lock manager to coordinate the outbox processing to prevent duplicate message sends, which adds significant complexity. Furthermore, the throughput is ultimately limited by the write performance of the single SQLite database file on its underlying storage. This architecture is ideal for scenarios with moderate throughput (hundreds or thousands of messages per second) where absolute data durability during connectivity loss is the primary concern, such as IoT edge gateways or application agents in less reliable network environments. Future iterations could explore moving fully processed messages to an archive table or file to keep the primary outbox table lean and performant.


  TOC