The initial system architecture was straightforward, but it proved to be naive under production load. We were handling a high volume of user interaction events—clicks, views, hovers—that needed to be aggregated for analytics. The first implementation processed each event individually via a REST endpoint, leading to a direct database write. When failures occurred, each failed event created a distinct issue in Sentry. This resulted in two critical problems: unsustainable database I/O contention and an unmanageable flood of low-signal Sentry alerts. An event processing pipeline designed for aggregation was clearly necessary.
The core concept was to introduce a buffering layer. Instead of direct synchronous processing, events would be published to a message queue. A dedicated consumer service would then read from this queue, aggregate events in memory over a short time window, and perform a single batch write to the analytics database. This “rollup” approach would drastically reduce database transactions and allow us to handle failures at the batch level, not the individual event level.
For the technology stack, the choices were driven by production pragmatism. AWS SQS was selected for the message queue due to its managed nature, high throughput, and at-least-once delivery guarantee, which fits our tolerance for event aggregation. Micronaut was chosen for the consumer service. Its compile-time dependency injection and low memory overhead make it exceptionally well-suited for containerized, scalable microservices that might need to scale down to zero to manage costs. Sentry remained our observability platform, but the challenge shifted. The central question became: how do we make Sentry effective in an asynchronous, batch-processing world where a single failure represents a collection of independent source messages? A generic Sentry alert for a failed batch is almost useless without knowing which messages were in that batch. This demanded a solution for deep context propagation.
Our first pass at the Micronaut listener was a direct implementation using the @SQSListener
annotation provided by the Micronaut AWS integration. While functional, it was quickly identified as suboptimal for our use case.
// src/main/java/com/example/rollup/InitialSqsListener.java
package com.example.rollup;
import io.micronaut.jms.annotations.JMSListener;
import io.micronaut.jms.annotations.SQSConnectionFactory;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
@JMSListener(connectionFactory = "sqsConnectionFactory")
public class InitialSqsListener {
private static final Logger LOG = LoggerFactory.getLogger(InitialSqsListener.class);
private final AnalyticsProcessor processor;
public InitialSqsListener(AnalyticsProcessor processor) {
this.processor = processor;
}
// This annotation-driven approach processes one message at a time.
// While simple, it's inefficient for high-throughput aggregation.
// We would be making one DB call per message, defeating the purpose of the rollup.
@jakarta.jms.MessageListener(destination = "${aws.sqs.queue-name}")
public void onMessage(String messageBody) {
LOG.info("Received message: {}", messageBody);
// This is the anti-pattern we are trying to solve.
// processor.processSingleEvent(messageBody);
}
}
The fundamental flaw here is that the abstraction hides the batching capabilities of SQS. Each message acknowledgment is handled individually. To implement a proper rollup, we needed to take manual control of message polling, processing, and deletion. This meant abandoning the high-level @SQSListener
in favor of directly using the AWS SDK v2’s SqsClient
. This gives us fine-grained control over fetching a batch of messages (ReceiveMessageRequest
), processing them as a unit, and then deleting them upon successful persistence (DeleteMessageBatchRequest
).
The revised architecture involved a scheduled, long-polling mechanism coupled with a thread-safe in-memory buffer.
sequenceDiagram participant Scheduler as Micronaut Scheduler (@Scheduled) participant SqsPoller as SqsPollerService participant SQS as AWS SQS participant EventBuffer as ThreadSafeEventBuffer participant RollupProcessor as RollupProcessorService Scheduler->>SqsPoller: trigger poll() SqsPoller->>SQS: receiveMessage(maxNumberOfMessages=10, waitTimeSeconds=20) SQS-->>SqsPoller: return [Msg1, Msg2, ...] loop For each message SqsPoller->>EventBuffer: add(message) end Scheduler->>RollupProcessor: trigger processBuffer() RollupProcessor->>EventBuffer: drainBuffer() EventBuffer-->>RollupProcessor: return [Msg1, Msg2, ...] RollupProcessor->>RollupProcessor: performAggregation(messages) alt on Success RollupProcessor->>SQS: deleteMessageBatch([ReceiptHandle1, ...]) else on Failure RollupProcessor->>Sentry: captureException(exception, context) Note right of RollupProcessor: Messages are NOT deleted and
will be re-processed after
visibility timeout expires. end
This design introduces a controlled, stateful component into our service. The SqsPollerService
is responsible for one thing: efficiently pulling messages from SQS and placing them into a shared buffer. A separate RollupProcessorService
is scheduled to periodically drain this buffer and execute the business logic.
Here is the build configuration needed in build.gradle.kts
to support this stack.
// build.gradle.kts
plugins {
id("io.micronaut.application") version "3.7.9"
// other plugins...
}
application {
mainClass.set("com.example.rollup.Application")
}
micronaut {
runtime("netty")
testRuntime("junit5")
processing {
incremental(true)
annotations("com.example.rollup.*")
}
}
dependencies {
implementation("io.micronaut:micronaut-runtime")
implementation("io.micronaut.aws:micronaut-aws-sdk-v2")
implementation("software.amazon.awssdk:sqs")
implementation("io.sentry:sentry-micronaut")
implementation("jakarta.annotation:jakarta.annotation-api")
runtimeOnly("ch.qos.logback:logback-classic")
}
The application configuration in src/main/resources/application.yml
ties everything together. A common mistake is hardcoding these values; using Micronaut’s configuration system allows for environment-specific overrides.
# src/main/resources/application.yml
micronaut:
application:
name: sqs-rollup-service
aws:
region: "us-east-1"
sqs:
queue-url: "https://sqs.us-east-1.amazonaws.com/123456789012/analytics-events-queue"
sentry:
dsn: "${SENTRY_DSN}" # Best practice to load from environment variable
traces-sample-rate: 0.1
# Custom callback to enrich events before they are sent
before-send: com.example.rollup.sentry.SentryContextCallback
# Custom application properties
app:
rollup:
buffer-flush-seconds: 10
poller-fixed-delay-ms: 1000
max-batch-size: 100 # In-memory buffer limit
sqs:
poll-max-messages: 10
poll-wait-seconds: 20
The core of the solution is the thread-safe buffer and the two coordinated services.
// src/main/java/com/example/rollup/buffer/EventBuffer.java
package com.example.rollup.buffer;
import jakarta.inject.Singleton;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@Singleton
public class EventBuffer {
// ConcurrentLinkedQueue is a good choice for MPSC (Multi-Producer, Single-Consumer) scenarios.
// Our SQS poller will be the producer, and the Rollup processor the consumer.
private final ConcurrentLinkedQueue<Message> messageQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger currentSize = new AtomicInteger(0);
public void add(Message message) {
messageQueue.add(message);
currentSize.incrementAndGet();
}
public int size() {
return currentSize.get();
}
public List<Message> drain(int maxElements) {
List<Message> drainedMessages = new ArrayList<>(maxElements);
while (drainedMessages.size() < maxElements) {
Message message = messageQueue.poll();
if (message == null) {
// Buffer is empty
break;
}
drainedMessages.add(message);
currentSize.decrementAndGet();
}
return drainedMessages;
}
}
The SQS polling service becomes a simple, scheduled task focused entirely on fetching messages.
// src/main/java/com/example/rollup/SqsPollerService.java
package com.example.rollup;
import com.example.rollup.buffer.EventBuffer;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.annotation.Scheduled;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.List;
@Singleton
public class SqsPollerService {
private static final Logger LOG = LoggerFactory.getLogger(SqsPollerService.class);
private final SqsClient sqsClient;
private final EventBuffer eventBuffer;
private final String queueUrl;
private final int maxMessages;
private final int waitTimeSeconds;
public SqsPollerService(SqsClient sqsClient,
EventBuffer eventBuffer,
@Value("${aws.sqs.queue-url}") String queueUrl,
@Value("${app.sqs.poll-max-messages}") int maxMessages,
@Value("${app.sqs.poll-wait-seconds}") int waitTimeSeconds) {
this.sqsClient = sqsClient;
this.eventBuffer = eventBuffer;
this.queueUrl = queueUrl;
this.maxMessages = maxMessages;
this.waitTimeSeconds = waitTimeSeconds;
}
// This runs on a fixed delay, continuously populating our in-memory buffer.
@Scheduled(fixedDelay = "${app.poller-fixed-delay-ms}ms")
void pollForMessages() {
try {
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(maxMessages)
.waitTimeSeconds(waitTimeSeconds) // Enables SQS Long Polling
.messageAttributeNames("All") // Important for trace IDs
.build();
List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages();
if (!messages.isEmpty()) {
LOG.info("Polled {} messages from SQS.", messages.size());
messages.forEach(eventBuffer::add);
}
} catch (Exception e) {
// In a real-world project, this exception MUST be handled properly.
// A simple log is insufficient. We need metrics and alerts here.
LOG.error("Critical error during SQS polling", e);
}
}
}
Now for the crucial part: the rollup processor and its integration with Sentry. This is where the business logic and observability converge.
// src/main/java/com/example/rollup/RollupProcessorService.java
package com.example.rollup;
import com.example.rollup.buffer.EventBuffer;
import com.example.rollup.sentry.SentryContextManager;
import io.micronaut.context.annotation.Value;
import io.micronaut.scheduling.annotation.Scheduled;
import io.sentry.Sentry;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.List;
import java.util.stream.Collectors;
@Singleton
public class RollupProcessorService {
private static final Logger LOG = LoggerFactory.getLogger(RollupProcessorService.class);
private final EventBuffer eventBuffer;
private final AnalyticsRepository analyticsRepository;
private final SqsClient sqsClient;
private final SentryContextManager sentryContextManager;
private final int maxBatchSize;
private final String queueUrl;
public RollupProcessorService(EventBuffer eventBuffer,
AnalyticsRepository analyticsRepository,
SqsClient sqsClient,
SentryContextManager sentryContextManager,
@Value("${app.rollup.max-batch-size}") int maxBatchSize,
@Value("${aws.sqs.queue-url}") String queueUrl) {
this.eventBuffer = eventBuffer;
this.analyticsRepository = analyticsRepository;
this.sqsClient = sqsClient;
this.sentryContextManager = sentryContextManager;
this.maxBatchSize = maxBatchSize;
this.queueUrl = queueUrl;
}
@Scheduled(fixedRate = "${app.rollup.buffer-flush-seconds}s")
void processBuffer() {
if (eventBuffer.size() == 0) {
return;
}
List<Message> messages = eventBuffer.drain(maxBatchSize);
if (messages.isEmpty()) {
return;
}
LOG.info("Processing a batch of {} events.", messages.size());
// This is the critical observability integration.
// We wrap the entire batch processing in a Sentry scope.
Sentry.withScope(scope -> {
// The ContextManager enriches the scope with details from the SQS messages.
sentryContextManager.enrichScopeWithSqsBatch(scope, messages);
try {
// 1. Perform the actual data aggregation and persistence.
// This logic must be idempotent.
analyticsRepository.saveBatch(messages);
// 2. If persistence is successful, delete the messages from SQS.
deleteMessagesFromSqs(messages);
LOG.info("Successfully processed and deleted batch of {}.", messages.size());
} catch (Exception e) {
// If anything goes wrong, capture the exception.
// Sentry will automatically include the rich context from the scope.
LOG.error("Failed to process event batch. Messages will be redriven by SQS.", e);
Sentry.captureException(e);
// We DO NOT delete messages on failure. SQS visibility timeout will make them available again.
}
});
}
private void deleteMessagesFromSqs(List<Message> messages) {
List<DeleteMessageBatchRequestEntry> deleteEntries = messages.stream()
.map(msg -> DeleteMessageBatchRequestEntry.builder()
.id(msg.messageId()) // The ID must be unique within the request
.receiptHandle(msg.receiptHandle())
.build())
.collect(Collectors.toList());
if (deleteEntries.isEmpty()) return;
DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(deleteEntries)
.build();
sqsClient.deleteMessageBatch(deleteRequest);
}
// A crucial lifecycle hook. On shutdown, attempt to process any remaining items in the buffer.
@PreDestroy
public void onShutdown() {
LOG.info("Application shutting down. Draining final buffer...");
processBuffer();
LOG.info("Shutdown drain complete.");
}
}
The magic lies in the SentryContextManager
. This is a custom utility class responsible for extracting relevant information from the SQS messages and attaching it to the Sentry scope. This is not something that comes out-of-the-box and is the key to making Sentry useful here.
// src/main/java/com/example/rollup/sentry/SentryContextManager.java
package com.example.rollup.sentry;
import io.sentry.Scope;
import jakarta.inject.Singleton;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public class SentryContextManager {
/**
* Enriches the current Sentry scope with contextual information from a batch of SQS messages.
* This is the core of our observability strategy for batch processing.
*
* @param scope The current Sentry scope.
* @param messages The list of SQS messages in the batch being processed.
*/
public void enrichScopeWithSqsBatch(Scope scope, List<Message> messages) {
// Set a tag for easy filtering in Sentry UI.
scope.setTag("processing_mode", "sqs_batch");
scope.setTag("batch_size", String.valueOf(messages.size()));
// Add a structured context block with critical message details.
// This is far more powerful than just logging.
List<Map<String, String>> messageContext = messages.stream()
.map(this::messageToContextMap)
.collect(Collectors.toList());
// This context will be attached to any error event captured within this scope.
scope.setContexts("sqs_batch_info", Map.of(
"message_count", messages.size(),
"messages", messageContext
));
}
private Map<String, String> messageToContextMap(Message message) {
// Extract any custom trace IDs or correlation IDs from message attributes.
// In a real-world project, services that produce messages should add these attributes.
String traceId = message.messageAttributes().containsKey("trace_id") ?
message.messageAttributes().get("trace_id").stringValue() : "N/A";
return Map.of(
"messageId", message.messageId(),
"receiptHandle", message.receiptHandle(), // Be careful not to expose this publicly
"traceId", traceId
);
}
}
The final piece is ensuring the repository logic is idempotent. At-least-once delivery from SQS means we might process the same message more than once if a deleteMessageBatch
call fails or if the service crashes after processing but before deleting. The rollup logic must gracefully handle this. A common strategy is to make the database write itself idempotent.
// src/main/java/com/example/rollup/AnalyticsRepository.java
package com.example.rollup;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.model.Message;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Singleton
public class AnalyticsRepository {
private static final Logger LOG = LoggerFactory.getLogger(AnalyticsRepository.class);
// This is a dummy implementation. In a real project, this would be a JDBC client,
// JPA repository, or similar, interacting with a real database.
public void saveBatch(List<Message> messages) {
// 1. Deserialize message bodies into domain objects.
// Omitted for brevity.
// 2. Perform the "rollup" logic. For example, count events by type.
Map<String, Long> eventCounts = messages.stream()
.collect(Collectors.groupingBy(
// Assuming body is a simple string like "EVENT_TYPE_A"
Message::body,
Collectors.counting()
));
// 3. Persist the aggregated result.
// To ensure idempotency, the persistence logic needs to handle duplicates.
// For example, using an ON CONFLICT DO UPDATE (UPSERT) statement in SQL,
// keyed by the analytics type and a time window.
// e.g., INSERT INTO hourly_analytics (event_type, hour, count) VALUES (?, ?, ?)
// ON CONFLICT (event_type, hour) DO UPDATE SET count = count + excluded.count;
LOG.info("Persisting aggregated data: {}", eventCounts);
// Simulate a potential failure.
if (Math.random() > 0.95) {
throw new RuntimeException("Simulated database connection failure!");
}
}
}
Testing this architecture requires isolating components. The business logic within AnalyticsRepository
can be unit-tested easily. The core challenge is integration testing the SQS interaction. Here, Testcontainers with a LocalStack module is the industry standard. It provides a lightweight, ephemeral AWS cloud stack in a Docker container.
A sample integration test structure would look like this:
// src/test/java/com/example/rollup/RollupIntegrationTest.java
package com.example.rollup;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import static org.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
@MicronautTest
@Testcontainers
class RollupIntegrationTest {
@Container
static LocalStackContainer localstack = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:1.4.0")
).withServices(LocalStackContainer.Service.SQS);
@Inject
SqsClient sqsClient;
// In a real test, you'd inject a mock or spy of AnalyticsRepository
// to verify it was called with the correct data.
@Test
void testMessageProcessingFlow() throws InterruptedException {
// Setup: Create the queue in LocalStack
String queueName = "test-analytics-queue";
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder().queueName(queueName).build()).queueUrl();
// Act: Send a message to the SQS queue
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("TEST_EVENT")
.build());
// Assert: Wait for the system to process the message.
// In a real test, you would verify that your mock repository's `saveBatch` method was called.
// Here, we just wait to see logs, but this is not a robust assertion.
await().atMost(15, SECONDS).until(() -> {
// A more robust check would involve checking a database or a mock.
System.out.println("Checking for processing completion...");
// return mockedRepository.getInvocationCount() > 0;
return true;
});
}
// This is required to override the application.yml properties for the test
// to point to the dynamically created Testcontainer.
static {
System.setProperty("aws.sqs.queue-url", "http://localhost:" + localstack.getMappedPort(4566) + "/000000000000/test-analytics-queue");
System.setProperty("aws.region", "us-east-1");
System.setProperty("aws.accessKeyId", "test");
System.setProperty("aws.secretAccessKey", "test");
}
}
This test setup is non-trivial but provides immense value by validating the entire flow from SQS polling to message deletion under controlled conditions.
The current design successfully addresses the initial pain points. It is, however, not without its own limitations and trade-offs. The in-memory buffer introduces a single point of failure; a pod crash results in the loss of all messages currently in the buffer that have been pulled from SQS but not yet processed. For our analytics use case, this small amount of potential data loss was acceptable. For a system requiring stronger guarantees, this buffer would need to be replaced with a persistent store like Redis or a durable log. Furthermore, the batch size and processing interval are static. A more sophisticated implementation could dynamically adjust these parameters based on the ingress rate of messages and the processing latency of the downstream database, creating a truly adaptive and responsive system.