Implementing a Durable SNS Message Consumer with a Rust, Rocket, and LevelDB Stateful Agent


The core of the problem was a classic impedance mismatch between a stateless cloud event stream and a stateful on-premise workload. Our AWS infrastructure emits critical operational events via SNS, but the consumer application had to run within a client’s data center, plagued by intermittent network connectivity. Initial attempts using simple webhook listeners were disastrous. A network blip during processing meant a lost event, and a listener crash after acknowledgment but before completion led to the same outcome. We were violating the at-least-once processing guarantee that the business process demanded.

We needed an agent—a local, durable buffer that could reliably catch events from the cloud, persist them immediately, and process them independently of the initial ingress. This agent had to be lightweight, resilient to restarts, and provide local visibility for operators. This is the story of how we built that agent using a rather specific stack: Rust with the Rocket web framework, LevelDB for persistence, all packaged as an OCI container for deployment.

Our architecture is straightforward but effective. SNS delivers messages to a public HTTPS endpoint. Our agent, running on-prem, exposes this endpoint. The key design decision is that the HTTP handler does almost no work. It validates the message, writes it to a local durable queue (LevelDB), and immediately returns a 200 OK to SNS. A separate background worker thread reads from this queue, processes the message, and only deletes it upon success.

sequenceDiagram
    participant AWS_SNS as AWS SNS
    participant Agent_Endpoint as Agent (Rocket Endpoint)
    participant Durable_Queue as Durable Queue (LevelDB)
    participant Worker as Agent (Background Worker)
    participant Target_System as Target System

    AWS_SNS->>+Agent_Endpoint: POST /webhook (Notification)
    Agent_Endpoint->>Agent_Endpoint: Verify SNS Signature
    Agent_Endpoint->>+Durable_Queue: Enqueue Message
    Durable_Queue-->>-Agent_Endpoint: Success
    Agent_Endpoint-->>-AWS_SNS: HTTP 200 OK

    loop Async Processing
        Worker->>+Durable_Queue: Dequeue Message
        Durable_Queue-->>-Worker: Message Data
        Worker->>+Target_System: Process Message
        Target_System-->>-Worker: Processing Result
        alt Processing Successful
            Worker->>+Durable_Queue: Delete Message
            Durable_Queue-->>-Worker: Deletion Confirmed
        else Processing Failed
            Worker->>Durable_Queue: (Re-queue or log for later)
            Note right of Worker: Implement backoff/retry logic here
        end
    end

The choice of Rust was driven by the need for a small, performant, and memory-safe binary. For an agent that might run on resource-constrained edge hardware, avoiding the overhead of a JVM or Python interpreter was a significant win. Rocket provided a pleasant, type-safe way to build the web-facing parts. For the durable queue, we sidestepped heavier solutions like PostgreSQL or SQLite. LevelDB, an embedded key-value store, offered the perfect balance. Its Log-Structured Merge-tree design is heavily optimized for write throughput, which is exactly what our ingress endpoint needed. And because it’s embedded, there’s no separate database process to manage, simplifying deployment. The final piece, OCI, is the standard for shipping applications. A multi-stage Dockerfile gives us a tiny, secure container image.

Let’s start with the project’s foundation, the Cargo.toml.

[package]
name = "durable-sns-agent"
version = "0.1.0"
edition = "2021"

[dependencies]
rocket = { version = "0.5.0", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.20", features = ["full"] }
leveldb = { version = "0.9", features = ["full"] }
uuid = { version = "1.2", features = ["v4"] }
log = "0.4"
env_logger = "0.9"
base64 = "0.21"
ring = "0.16" # For SNS signature verification
reqwest = { version = "0.11", features = ["json"] }
once_cell = "1.16"

# For the simple UI
[dependencies.rocket_dyn_templates]
version = "0.1.0"
features = ["tera"]

The core logic is split into three parts: the LevelDB-backed queue, the Rocket web server, and the background worker. We’ll start by abstracting the database.

Building a Durable Queue with LevelDB

We don’t want the rest of our application to know about LevelDB’s specifics. We’ll create a DurableQueue struct that provides simple enqueue and dequeue operations. A critical detail is how we manage keys. To maintain order, we’ll use time-stamped, sortable keys. A UUID is added to prevent key collisions if two messages arrive at the exact same microsecond.

// src/durable_queue.rs

use leveldb::database::Database;
use leveldb::kv::KV;
use leveldb::options::{Options, ReadOptions, WriteOptions};
use serde::{de::DeserializeOwned, Serialize};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};

// A tuple representing a dequeued item: (key, value)
pub type DequeuedItem<T> = (Vec<u8>, T);

#[derive(Debug)]
pub enum QueueError {
    DbError(leveldb::error::Error),
    SerializationError(serde_json::Error),
    DeserializationError(serde_json::Error),
    ItemNotFound,
}

// Implement From traits for easier error handling with `?`
impl From<leveldb::error::Error> for QueueError {
    fn from(err: leveldb::error::Error) -> Self {
        QueueError::DbError(err)
    }
}

// A thread-safe, cloneable wrapper around our LevelDB instance.
#[derive(Clone)]
pub struct DurableQueue {
    db: Arc<Mutex<Database<i32>>>,
}

impl DurableQueue {
    pub fn new(path: &str) -> Result<Self, QueueError> {
        let path = Path::new(path);
        let mut options = Options::new();
        options.create_if_missing = true;
        let database = Database::open(path, options)?;
        Ok(DurableQueue {
            db: Arc::new(Mutex::new(database)),
        })
    }

    /// Generates a sortable key based on current timestamp and a UUID.
    /// Format: <timestamp_micros>-<uuid>
    fn generate_key(&self) -> String {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_micros();
        let id = uuid::Uuid::new_v4().to_string();
        format!("{}-{}", timestamp, id)
    }

    /// Enqueues an item into the durable queue.
    pub async fn enqueue<T: Serialize>(&self, item: &T) -> Result<(), QueueError> {
        let key = self.generate_key();
        let serialized_item = serde_json::to_vec(item).map_err(QueueError::SerializationError)?;
        
        let db_lock = self.db.lock().await;
        db_lock.put(WriteOptions::new(), key.as_bytes(), &serialized_item)?;
        Ok(())
    }

    /// Dequeues the oldest item from the queue.
    /// It finds the item with the lexicographically smallest key.
    pub async fn dequeue<T: DeserializeOwned>(&self) -> Result<Option<DequeuedItem<T>>, QueueError> {
        let db_lock = self.db.lock().await;
        
        // Use an iterator to find the first (oldest) key
        let mut iterator = db_lock.iter(ReadOptions::new());
        
        if let Some((key, value)) = iterator.next() {
            let deserialized_value: T = serde_json::from_slice(&value).map_err(QueueError::DeserializationError)?;
            Ok(Some((key.to_vec(), deserialized_value)))
        } else {
            Ok(None) // Queue is empty
        }
    }

    /// Deletes an item from the queue using its key.
    /// This is called after an item has been successfully processed.
    pub async fn confirm_delete(&self, key: &[u8]) -> Result<(), QueueError> {
        let db_lock = self.db.lock().await;
        db_lock.delete(WriteOptions::new(), key)?;
        Ok(())
    }

    /// Gets the current depth of the queue.
    /// In a real-world project, this can be an expensive operation. 
    /// Consider maintaining a separate counter for high-frequency checks.
    pub async fn depth(&self) -> Result<usize, QueueError> {
        let db_lock = self.db.lock().await;
        let iterator = db_lock.iter(ReadOptions::new());
        Ok(iterator.count())
    }

    /// Peeks at the items in the queue without removing them.
    pub async fn peek_multiple<T: DeserializeOwned>(&self, count: usize) -> Result<Vec<(String, T)>, QueueError> {
        let db_lock = self.db.lock().await;
        let mut items = Vec::new();
        let iterator = db_lock.iter(ReadOptions::new());

        for (key, value) in iterator.take(count) {
            let deserialized_value: T = serde_json::from_slice(&value).map_err(QueueError::DeserializationError)?;
            let key_str = String::from_utf8_lossy(&key).to_string();
            items.push((key_str, deserialized_value));
        }
        Ok(items)
    }
}

This abstraction gives us a clean, async-ready interface for our queue. The use of Arc<Mutex<...>> is crucial for sharing the database connection safely across the web endpoint and the background worker.

The Rocket Endpoint for SNS

The web server has two jobs: handling incoming SNS messages and serving a simple diagnostic UI. Let’s focus on the SNS webhook first. A production-grade SNS webhook listener must handle three message types (SubscriptionConfirmation, Notification, UnsubscribeConfirmation) and, most importantly, must verify the cryptographic signature of every message. Ignoring signature verification is a massive security vulnerability.

// src/sns_handler.rs

use rocket::{post, response::status, State, serde::json::Json};
use serde::{Deserialize, Serialize};
use crate::durable_queue::DurableQueue;
use log::{info, warn, error};
use std::collections::HashMap;

// Simplified structs for SNS message parsing
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct SnsMessage {
    pub r#type: String,
    pub message_id: Option<String>,
    pub topic_arn: Option<String>,
    pub subject: Option<String>,
    pub message: Option<String>,
    pub subscribe_url: Option<String>,
    pub token: Option<String>,
    #[serde(flatten)]
    pub signature_fields: SignatureFields,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct SignatureFields {
    pub signature_version: String,
    pub signature: String,
    pub signing_cert_url: String,
}

// This struct will be what we actually store in our LevelDB queue.
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Job {
    pub message_id: String,
    pub subject: String,
    pub body: String,
    pub received_at: u128,
}

// A simple in-memory cache for signing certificates.
// In production, a more robust solution like Redis or a timed cache is better.
static CERT_CACHE: once_cell::sync::Lazy<tokio::sync::Mutex<HashMap<String, Vec<u8>>>> =
    once_cell::sync::Lazy::new(|| tokio::sync::Mutex::new(HashMap::new()));


// The main webhook endpoint
#[post("/sns/webhook", format = "json", data = "<message>")]
pub async fn sns_webhook(
    message: Json<SnsMessage>,
    queue: &State<DurableQueue>,
) -> status::Accepted<()> {
    
    // --- 1. Signature Verification (CRITICAL) ---
    // A common mistake is skipping this step in development.
    // In production, this is mandatory to prevent forged requests.
    if let Err(e) = verify_sns_signature(&message).await {
        error!("SNS signature verification failed: {:?}", e);
        // Do not return Accepted, let SNS retry. Or return a 400.
        // For simplicity, we'll proceed but log a critical error.
        // In a real project, you would return an error here.
    } else {
        info!("SNS signature verified successfully.");
    }

    // --- 2. Handle Different SNS Message Types ---
    match message.r#type.as_str() {
        "SubscriptionConfirmation" => {
            if let Some(url) = &message.subscribe_url {
                info!("Received SNS SubscriptionConfirmation. Visiting URL: {}", url);
                // In production, you MUST validate the URL belongs to AWS.
                let _ = reqwest::get(url).await;
            }
        }
        "Notification" => {
            info!("Received SNS Notification.");
            if let (Some(id), Some(subject), Some(body)) = (
                message.message_id.clone(),
                message.subject.clone(),
                message.message.clone(),
            ) {
                let job = Job {
                    message_id: id,
                    subject,
                    body,
                    received_at: std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .unwrap()
                        .as_millis(),
                };

                // The one and only critical action: enqueue.
                match queue.enqueue(&job).await {
                    Ok(_) => info!("Successfully enqueued job {}", job.message_id),
                    Err(e) => {
                        error!("Failed to enqueue job {}: {:?}", job.message_id, e);
                        // By not returning a success code, we signal SNS to retry the delivery.
                        // This is a key part of the resilience pattern.
                        // NOTE: Rocket doesn't easily let us return 500 from an `Accepted`.
                        // A custom Responder would be better for production code.
                    }
                }
            } else {
                warn!("Received malformed SNS Notification.");
            }
        }
        "UnsubscribeConfirmation" => {
            info!("Received UnsubscribeConfirmation. Manual action may be required.");
            // Typically you log this and confirm in the AWS console.
        }
        _ => {
            warn!("Received unknown SNS message type: {}", message.r#type);
        }
    }

    // Always return Accepted to SNS immediately after enqueueing or handling.
    status::Accepted(None)
}


// --- Signature Verification Logic ---
// This is complex but essential. Production code would have more robust
// URL validation and error handling.
async fn verify_sns_signature(msg: &SnsMessage) -> Result<(), String> {
    let cert_url = &msg.signature_fields.signing_cert_url;

    // Fetch the certificate (with caching)
    let mut cache = CERT_CACHE.lock().await;
    let cert_pem = if let Some(cert) = cache.get(cert_url) {
        cert.clone()
    } else {
        let resp = reqwest::get(cert_url).await.map_err(|e| e.to_string())?;
        let body = resp.bytes().await.map_err(|e| e.to_string())?.to_vec();
        cache.insert(cert_url.clone(), body.clone());
        body
    };
    drop(cache); // Release lock

    // Decode the signature
    let signature = base64::decode(&msg.signature_fields.signature).map_err(|e| e.to_string())?;

    // Build the canonical string to sign
    let string_to_sign = build_string_to_sign(msg);

    // Verify
    let cert = webpki::EndEntityCert::try_from(&cert_pem).map_err(|e| e.to_string())?;
    cert.verify_signature(&webpki::RSA_PKCS1_2048_8192_SHA256, &string_to_sign.as_bytes(), &signature)
        .map_err(|e| e.to_string())?;

    Ok(())
}

fn build_string_to_sign(msg: &SnsMessage) -> String {
    let mut result = String::new();
    result.push_str("Message\n");
    result.push_str(msg.message.as_deref().unwrap_or(""));
    result.push_str("\n");
    
    result.push_str("MessageId\n");
    result.push_str(msg.message_id.as_deref().unwrap_or(""));
    result.push_str("\n");

    if let Some(subject) = &msg.subject {
        result.push_str("Subject\n");
        result.push_str(subject);
        result.push_str("\n");
    }

    result.push_str("Timestamp\n"); // SNS includes Timestamp, our struct omits it for brevity
    // In a real implementation, you MUST parse and include the Timestamp field
    // from the original raw JSON body. This is a simplification for the example.
    // result.push_str(msg.timestamp.as_ref().unwrap());
    result.push_str("\n");

    result.push_str("TopicArn\n");
    result.push_str(msg.topic_arn.as_deref().unwrap_or(""));
    result.push_str("\n");

    result.push_str("Type\n");
    result.push_str(&msg.r#type);
    result.push_str("\n");

    result
}

Note: The signature verification code is simplified. A production version needs to parse the Timestamp field correctly and handle all potential fields for the canonical string construction.

The Background Worker

This is the component that does the actual work. It runs in its own Tokio task, continuously pulling jobs from the DurableQueue. Its logic is simple: attempt to process a job; if successful, delete it from the queue; if it fails, leave it in the queue for a future attempt. This ensures that a transient failure (like the target system being down) doesn’t cause data loss.

// src/worker.rs

use crate::durable_queue::{DurableQueue, DequeuedItem};
use crate::sns_handler::Job;
use std::time::Duration;
use tokio::time::sleep;
use log::{info, error, warn};

// Simulates the actual business logic processing.
async fn process_job(job: &Job) -> Result<(), String> {
    info!("WORKER: Starting to process job {}", job.message_id);
    
    // Simulate a potentially failing operation
    if job.body.contains("fail") {
        error!("WORKER: Simulated failure for job {}", job.message_id);
        return Err("Simulated processing failure".to_string());
    }

    // Simulate work
    sleep(Duration::from_secs(2)).await;

    info!("WORKER: Successfully processed job {}: Subject: '{}'", job.message_id, job.subject);
    Ok(())
}

pub async fn run_worker(queue: DurableQueue) {
    info!("Background worker started.");
    loop {
        match queue.dequeue::<Job>().await {
            Ok(Some(item)) => {
                let (key, job) = item;
                match process_job(&job).await {
                    Ok(_) => {
                        // Success! Now we can safely delete it from the queue.
                        if let Err(e) = queue.confirm_delete(&key).await {
                            error!("CRITICAL: Failed to delete processed job {:?}. Potential for duplicate processing. Error: {:?}", key, e);
                        }
                    }
                    Err(e) => {
                        error!("WORKER: Failed to process job {}: {}. It will be retried later.", job.message_id, e);
                        // The job remains in the queue.
                        // A more advanced worker would implement a backoff strategy,
                        // increment a retry counter, and eventually move to a DLQ.
                        sleep(Duration::from_secs(5)).await; // Backoff before retrying
                    }
                }
            }
            Ok(None) => {
                // Queue is empty, wait a bit before checking again.
                sleep(Duration::from_secs(1)).await;
            }
            Err(e) => {
                error!("Worker failed to dequeue from LevelDB: {:?}. Retrying.", e);
                sleep(Duration::from_secs(5)).await;
            }
        }
    }
}

The Diagnostic UI

A headless agent is difficult to operate. A simple web UI that shows the queue depth and a few pending items provides immense value for troubleshooting. Rocket’s template support makes this easy.

// src/ui.rs

use rocket::get;
use rocket::State;
use rocket_dyn_templates::{Template, context};
use crate::durable_queue::DurableQueue;
use crate::sns_handler::Job;

#[get("/")]
pub async fn index(queue: &State<DurableQueue>) -> Template {
    let queue_depth = queue.depth().await.unwrap_or(0);
    let peeked_jobs: Vec<Job> = queue
        .peek_multiple::<Job>(10)
        .await
        .unwrap_or_default()
        .into_iter()
        .map(|(_key, job)| job)
        .collect();

    Template::render("index", context! {
        depth: queue_depth,
        jobs: peeked_jobs,
    })
}

// In templates/index.html.tera:
/*
<!DOCTYPE html>
<html>
<head>
    <title>Durable SNS Agent Status</title>
    <style> 
        body { font-family: sans-serif; background: #222; color: #eee; } 
        .container { max-width: 800px; margin: auto; padding: 20px; }
        .metric { font-size: 2em; }
        table { width: 100%; border-collapse: collapse; margin-top: 20px; }
        th, td { border: 1px solid #444; padding: 8px; text-align: left; }
        th { background: #333; }
    </style>
</head>
<body>
    <div class="container">
        <h1>Durable SNS Agent Status</h1>
        <p>Current Queue Depth: <span class="metric">{{ depth }}</span></p>
        <h2>Pending Jobs (up to 10)</h2>
        <table>
            <thead>
                <tr>
                    <th>Message ID</th>
                    <th>Subject</th>
                    <th>Body</th>
                </tr>
            </thead>
            <tbody>
                {% for job in jobs %}
                <tr>
                    <td>{{ job.message_id }}</td>
                    <td>{{ job.subject }}</td>
                    <td>{{ job.body | truncate(length=100) }}</td>
                </tr>
                {% endfor %}
            </tbody>
        </table>
    </div>
</body>
</html>
*/

Tying it all Together and Packaging

The main.rs file launches the Rocket server and spawns the background worker.

```rust
// src/main.rs

#[macro_use]
extern crate rocket;

mod durable_queue;
mod sns_handler;
mod worker;
mod ui;

use rocket_dyn_templates::Template;
use durable_queue::DurableQueue;

#[launch]
async fn rocket() -> _ {
env_logger::init();

// The path for the LevelDB database
let db_path = "./sns_queue_db";
let queue = DurableQueue::new(db_path)
    .expect("Failed to initialize durable queue");

// Clone the queue handle for the worker
let worker_queue = queue.clone();
tokio::spawn(async move {
    worker::run_worker(worker_queue).await;
});

rocket::build()
    .manage(queue)
    .mount("/", routes

  TOC