Constructing an Asynchronous NLP Pipeline for Document Similarity Alerts with Spring Boot, SNS, and ChromaDB


The initial system was deceptively simple and fundamentally broken for any real-world load. A user would upload a document via a REST endpoint, and our Spring Boot application would synchronously call a Python service to generate a vector embedding, insert it into ChromaDB, and then run a similarity query against all existing vectors. The entire process could take anywhere from 5 to 30 seconds, leading to HTTP request timeouts and an abysmal user experience. The immediate technical pain point was this synchronous, blocking I/O. The deeper architectural flaw was the lack of scalability and the tight coupling of the ingestion and analysis workflows. A simple document upload should not be held hostage by a potentially long-running computational task.

Our initial concept was to violently decouple these concerns. The document submission API should do the bare minimum: authenticate the user, accept the file, perform basic validation, and return a 202 Accepted status code. The actual heavy lifting—NLP processing, vector storage, similarity analysis, and user notification—would be offloaded to an asynchronous, scalable backend worker pool. This immediately pointed towards an event-driven architecture.

The technology selection process was guided by pragmatism and our existing ecosystem.

  1. Asynchronous Messaging: We needed a message bus. While Kafka or RabbitMQ were contenders, our infrastructure is heavily invested in AWS. Amazon Simple Notification Service (SNS) combined with Simple Queue Service (SQS) was the logical choice. SNS provides a robust pub/sub model with fan-out capabilities, and SQS provides a durable, persistent queue for our workers, ensuring messages aren’t lost if all workers are down. This combination is a classic pattern for reliable, asynchronous processing in the cloud.
  2. Vector Database: We stuck with ChromaDB. While more complex solutions like Pinecone or Weaviate exist, ChromaDB’s self-hostable nature and simple HTTP API made it easy to manage within our existing Kubernetes environment. The key was to ensure we could leverage its metadata filtering capabilities to scope similarity searches, which would be critical for performance.
  3. Authentication Across Services: Decoupling introduced a new problem: how does the asynchronous worker know who submitted the document? Passing raw user credentials in an SNS message is a severe security anti-pattern. The solution was to use a two-tiered JSON Web Token (JWT) system. The user authenticates to the public-facing API with a standard user JWT. Upon receiving a valid request, the API service generates a short-lived, internal-facing service JWT. This token contains only the necessary, non-sensitive context (e.g., userId, documentId) and is signed with a secret key known only to our internal services. This token is then passed inside the SNS message payload, allowing the worker to securely identify the context of the operation without having access to the user’s primary credentials.

The implementation journey began with refactoring the existing synchronous monolith into two distinct services: an api-service and a processing-worker. Both are Spring Boot applications.

Project Dependencies and Configuration

The pom.xml for both services shares a common base but diverges on specific dependencies. The api-service needs web and SNS dependencies, while the processing-worker needs SQS and clients for ChromaDB and our internal NLP service.

<!-- Shared Dependencies -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt-api</artifactId>
    <version>0.11.5</version>
</dependency>
<dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt-impl</artifactId>
    <version>0.11.5</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.jsonwebtoken</groupId>
    <artifactId>jjwt-jackson</artifactId>
    <version>0.11.5</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

<!-- api-service specific -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sns</artifactId>
</dependency>

<!-- processing-worker specific -->
<dependency>
    <groupId>io.awspring.cloud</groupId>
    <artifactId>spring-cloud-aws-starter-sqs</artifactId>
    <version>3.0.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency> <!-- For reactive ChromaDB client -->

<!-- For Testing -->
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>testcontainers</artifactId>
    <version>1.19.1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>localstack</artifactId>
    <version>1.19.1</version>
    <scope>test</scope>
</dependency>

The core of the system revolves around the application.yml configuration. A common mistake is to hardcode ARNs or secret keys. In a production environment, these must be externalized via environment variables or a configuration service.

# application.yml for api-service
aws:
  region: 'us-east-1'
  sns:
    topic-arn: 'arn:aws:sns:us-east-1:123456789012:DocumentProcessingTopic'

app:
  jwt:
    internal:
      secret: ${INTERNAL_JWT_SECRET} # Injected from environment
      expiration-ms: 300000 # 5 minutes

# application.yml for processing-worker
aws:
  region: 'us-east-1'

spring:
  cloud:
    aws:
      sqs:
        listener:
          max-number-of-messages: 10 # Batch processing
          poll-timeout: 20s

app:
  jwt:
    internal:
      secret: ${INTERNAL_JWT_SECRET}
  chromadb:
    host: 'http://chromadb-service:8000'
  nlp-service:
    host: 'http://nlp-service:5001'

The API Service: Ingestion and Event Publication

The entry point is the DocumentController. It’s protected by Spring Security to validate the end-user’s JWT. Its sole responsibility is to receive the file, generate a unique ID for it, create an internal service JWT, and publish an event to SNS.

// In api-service

@RestController
@RequestMapping("/api/v1/documents")
@RequiredArgsConstructor
@Slf4j
public class DocumentController {

    private final DocumentIngestionService documentIngestionService;

    @PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public ResponseEntity<Map<String, String>> uploadDocument(
            @RequestParam("file") MultipartFile file,
            Authentication authentication) { // Spring Security provides the authenticated principal

        if (file.isEmpty()) {
            throw new IllegalArgumentException("File cannot be empty.");
        }

        // A real implementation would extract userId from the JWT Principal
        String userId = authentication.getName(); 
        String documentId = UUID.randomUUID().toString();

        try {
            // In a real system, you'd persist the file to S3 first.
            // For this example, we assume the worker has access to the content later.
            // The key is publishing the event.
            documentIngestionService.initiateDocumentProcessing(userId, documentId, file.getOriginalFilename(), file.getBytes());

            Map<String, String> response = Map.of(
                "message", "Document processing initiated.",
                "documentId", documentId
            );
            return ResponseEntity.accepted().body(response);

        } catch (Exception e) {
            log.error("Failed to initiate document processing for user {}", userId, e);
            // Proper error response structure is crucial
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                                 .body(Map.of("error", "Failed to publish processing event."));
        }
    }
}

The DocumentIngestionService contains the logic for interacting with AWS SNS and creating the internal JWT. This separation of concerns is critical for testability.

// In api-service

@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentIngestionService {

    private final SnsClient snsClient;
    private final JwtService jwtService;

    @Value("${aws.sns.topic-arn}")
    private String topicArn;

    public void initiateDocumentProcessing(String userId, String documentId, String fileName, byte[] content) {
        // Step 1: Generate the internal service JWT
        String internalJwt = jwtService.createInternalServiceToken(userId, documentId);

        // Step 2: Create the message payload. JSON is a sane choice.
        String messagePayload = buildMessagePayload(documentId, fileName, internalJwt);

        // Step 3: Publish to SNS
        PublishRequest publishRequest = PublishRequest.builder()
                .topicArn(topicArn)
                .message(messagePayload)
                .messageAttributes(Map.of( // Use attributes for routing/filtering if needed
                    "eventType", MessageAttributeValue.builder().dataType("String").stringValue("DOCUMENT_RECEIVED").build()
                ))
                .build();

        try {
            PublishResponse response = snsClient.publish(publishRequest);
            log.info("Published message to SNS with ID: {}", response.messageId());
        } catch (SnsException e) {
            log.error("Error publishing to SNS: {}", e.awsErrorDetails().errorMessage(), e);
            // A real-world project needs a robust retry mechanism or a fallback here.
            // For example, writing to a local dead-letter log for later reprocessing.
            throw new RuntimeException("Failed to publish document processing event.", e);
        }
    }

    private String buildMessagePayload(String documentId, String fileName, String internalJwt) {
        // Using a proper JSON library like Jackson is recommended.
        return String.format(
            "{\"documentId\": \"%s\", \"fileName\": \"%s\", \"internalAuthToken\": \"%s\"}",
            documentId, fileName, internalJwt
        );
    }
}

The JwtService handles the creation and validation of our internal token. Note the use of a separate, strong secret key.

// Shared component, present in both services

@Service
public class JwtService {

    @Value("${app.jwt.internal.secret}")
    private String internalSecretKey;

    @Value("${app.jwt.internal.expiration-ms}")
    private long internalExpirationMs;

    private Key getSigningKey() {
        byte[] keyBytes = Decoders.BASE64.decode(this.internalSecretKey);
        return Keys.hmacShaKeyFor(keyBytes);
    }

    public String createInternalServiceToken(String userId, String documentId) {
        return Jwts.builder()
                .setSubject(userId)
                .claim("docId", documentId)
                .setIssuedAt(new Date(System.currentTimeMillis()))
                .setExpiration(new Date(System.currentTimeMillis() + internalExpirationMs))
                .signWith(getSigningKey(), SignatureAlgorithm.HS256)
                .compact();
    }
    
    public Claims parseInternalServiceToken(String token) {
        return Jwts.parserBuilder()
                .setSigningKey(getSigningKey())
                .build()
                .parseClaimsJws(token)
                .getBody();
    }
}

The Processing Worker: Consumption and Analysis

This is where the heavy lifting happens. The worker listens to an SQS queue that is subscribed to our SNS topic. The @SqsListener annotation from the Spring Cloud AWS project makes consumption trivial.

// In processing-worker

@Component
@RequiredArgsConstructor
@Slf4j
public class DocumentMessageListener {

    private final DocumentProcessingService processingService;
    private final ObjectMapper objectMapper; // Use Jackson for robust JSON parsing

    // The SQS queue URL is configured in application.yml or discovered.
    // `acknowledgementMode = SqsAcknowledgementMode.ON_SUCCESS` means Spring will delete the message
    // from the queue only if the method completes without an exception. This is crucial for reliability.
    @SqsListener(value = "${app.sqs.queue-name}", acknowledgementMode = SqsAcknowledgementMode.ON_SUCCESS)
    public void handleMessage(SqsMessage sqsMessage) {
        try {
            // SQS message from an SNS topic has a specific structure. The actual payload is a JSON string inside the 'Message' field.
            String snsPayloadJson = objectMapper.readTree(sqsMessage.payload()).get("Message").asText();
            DocumentProcessingEvent event = objectMapper.readValue(snsPayloadJson, DocumentProcessingEvent.class);
            
            log.info("Received document processing event for documentId: {}", event.getDocumentId());
            processingService.processDocument(event);

        } catch (Exception e) {
            log.error("Failed to process message with ID {}. It will be retried or sent to DLQ.", sqsMessage.messageId(), e);
            // By throwing an exception, we prevent Spring from acknowledging the message,
            // allowing SQS to handle retries based on the queue's redrive policy.
            throw new RuntimeException("Processing failed for message " + sqsMessage.messageId(), e);
        }
    }
}

// A simple DTO for deserializing the message
@Data
@NoArgsConstructor
@AllArgsConstructor
class DocumentProcessingEvent {
    private String documentId;
    private String fileName;
    private String internalAuthToken;
}

The DocumentProcessingService is the heart of the worker. It orchestrates calls to the NLP service, ChromaDB, and potentially back to SNS for notifications.

// In processing-worker

@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentProcessingService {

    private static final String COLLECTION_NAME = "document_embeddings";
    private static final double SIMILARITY_THRESHOLD = 0.85; // Configurable threshold

    private final JwtService jwtService;
    private final NlpServiceClient nlpServiceClient;
    private final ChromaDbClient chromaDbClient;
    private final NotificationService notificationService;

    public void processDocument(DocumentProcessingEvent event) {
        // Step 1: Validate the internal JWT and extract context
        Claims claims;
        try {
            claims = jwtService.parseInternalServiceToken(event.getInternalAuthToken());
        } catch (JwtException e) {
            log.warn("Invalid internal JWT received for documentId {}. Discarding message.", event.getDocumentId());
            // This is a terminal failure. We don't want to retry. Acknowledge and drop.
            return;
        }
        
        String userId = claims.getSubject();
        String documentId = claims.get("docId", String.class);

        // A real implementation would fetch document content from S3 using the documentId
        String documentContent = "This is the placeholder content for " + event.getFileName();

        // Step 2: Get vector embedding from the NLP service
        float[] vector = nlpServiceClient.getEmbedding(documentContent)
            .blockOptional() // Using WebClient reactively
            .orElseThrow(() -> new IllegalStateException("Failed to get embedding for document " + documentId));

        // Step 3: Insert the new document's vector into ChromaDB
        chromaDbClient.addEmbedding(COLLECTION_NAME, documentId, vector, Map.of("userId", userId, "fileName", event.getFileName()));
        log.info("Stored embedding for documentId {} for user {}", documentId, userId);

        // Step 4: Query for similar documents, excluding the user's own documents.
        // This is the core business logic for the alert system.
        List<ChromaDbClient.QueryResult> similarDocuments = chromaDbClient.querySimilar(
            COLLECTION_NAME, 
            vector,
            5, // Find top 5 candidates
            Map.of("userId", Map.of("$ne", userId)) // The crucial metadata filter!
        );

        // Step 5: Process results and send notifications
        for (ChromaDbClient.QueryResult result : similarDocuments) {
            if (result.getDistance() < (1.0 - SIMILARITY_THRESHOLD)) { // Chroma may use distance, not similarity
                String matchedUserId = (String) result.getMetadata().get("userId");
                log.info("Found similar document {} for user {} with similarity score {}. Notifying...", result.getId(), matchedUserId, 1.0 - result.getDistance());
                notificationService.sendSimilarityAlert(matchedUserId, documentId, result.getId());
            }
        }
    }
}

The interaction with ChromaDB is critical. A naive implementation would fetch all vectors and compare them in-memory, which is completely unscalable. The correct approach is to push down as much filtering as possible to the database. Here, we use a metadata filter {"userId": {"$ne": "current-user-id"}} to ensure we only search for documents belonging to other users.

// In processing-worker, a simplified ChromaDB client using WebClient

@Component
public class ChromaDbClient {

    private final WebClient webClient;

    public ChromaDbClient(@Value("${app.chromadb.host}") String chromaHost) {
        this.webClient = WebClient.builder().baseUrl(chromaHost).build();
    }

    public void addEmbedding(String collectionName, String docId, float[] vector, Map<String, Object> metadata) {
        // ... WebClient logic to call ChromaDB's /api/v1/collections/{collection_name}/add endpoint
        // The payload would include ids, embeddings, and metadatas arrays.
        // Proper error handling for network issues or DB errors is essential.
    }

    public List<QueryResult> querySimilar(String collectionName, float[] queryVector, int nResults, Map<String, Object> whereFilter) {
        // ... WebClient logic to call ChromaDB's /api/v1/collections/{collection_name}/query endpoint
        // The payload would include query_embeddings, n_results, and the `where` filter object.
        // This is the most performance-sensitive call in the entire workflow.
    }
    
    // DTOs for ChromaDB responses
    @Data
    public static class QueryResult {
        private String id;
        private Double distance;
        private Map<String, Object> metadata;
    }
}

This architecture is visually represented by the following flow:

sequenceDiagram
    participant User
    participant API_Service as Spring Boot API
    participant SNS_Topic as AWS SNS Topic
    participant SQS_Queue as AWS SQS Queue
    participant Worker as Spring Boot Worker
    participant NLP_Service as NLP Service
    participant ChromaDB

    User->>+API_Service: POST /documents (with User JWT)
    API_Service->>API_Service: Validate User JWT
    API_Service->>API_Service: Generate Internal Service JWT
    API_Service->>+SNS_Topic: Publish DocumentReceived event
    API_Service-->>-User: 202 Accepted
    SNS_Topic->>+SQS_Queue: Fan-out message
    Worker->>+SQS_Queue: Poll for messages
    SQS_Queue-->>-Worker: Deliver message
    Worker->>Worker: Parse message, validate Internal JWT
    Worker->>+NLP_Service: Request embedding for document
    NLP_Service-->>-Worker: Return vector
    Worker->>+ChromaDB: Add new vector with metadata (userId, etc.)
    ChromaDB-->>-Worker: Acknowledge insert
    Worker->>+ChromaDB: Query for similar vectors (with metadata filter)
    ChromaDB-->>-Worker: Return potential matches
    Worker->>Worker: Filter by similarity threshold
    alt Found Match
        Worker->>SNS_Topic: Publish SimilarityAlert event
    end

Testing the Asynchronous Flow

The biggest challenge in an event-driven system is testing. End-to-end tests become complex and brittle. The pragmatic solution is robust integration testing using Testcontainers. We can spin up a LocalStack container to emulate AWS SNS/SQS and a ChromaDB container.

// In processing-worker tests

@SpringBootTest
@Testcontainers
class DocumentProcessingIntegrationTest {

    @Container
    static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.3"))
            .withServices(LocalStackContainer.Service.SNS, LocalStackContainer.Service.SQS);

    // Assume a ChromaDB container is also configured similarly

    @Autowired
    private SnsClient snsClient;
    
    // Mocks for external services like NLP
    @MockBean
    private NlpServiceClient nlpServiceClient;

    private static String topicArn;
    private static String queueUrl;

    @BeforeAll
    static void beforeAll() throws Exception {
        // Programmatically create the SNS topic and SQS queue in LocalStack
        // and subscribe the queue to the topic.
    }
    
    @DynamicPropertySource
    static void overrideProperties(DynamicPropertyRegistry registry) {
        registry.add("aws.sns.topic-arn", () -> topicArn);
        registry.add("app.sqs.queue-name", () -> "test-queue");
        // ... other properties pointing to container ports
    }

    @Test
    void shouldProcessMessageAndFindSimilarity() {
        // Given: Mock NLP service response
        when(nlpServiceClient.getEmbedding(anyString())).thenReturn(Mono.just(new float[]{0.1f, 0.2f}));
        
        // And: A pre-existing document in the test ChromaDB instance

        // When: We publish a message to the SNS topic in LocalStack
        String internalJwt = ...; // create a valid test JWT
        String messagePayload = ...; // build the payload
        snsClient.publish(PublishRequest.builder().topicArn(topicArn).message(messagePayload).build());

        // Then: Assert that the processing service was called and that a notification was triggered.
        // This can be done using Awaitility to wait for the asynchronous operation to complete.
        // We can also verify the state of ChromaDB after processing.
    }
}

This setup allows us to test the entire flow from message publication to database interaction within a controlled, local environment, which is paramount for maintaining system stability and developer velocity.

The final architecture successfully solved the initial problem of request timeouts and poor user experience. However, it’s not without its own set of trade-offs and potential future bottlenecks. The query load on ChromaDB is now the primary scaling concern; as the number of documents grows into the millions, a single query-on-write operation may become too slow. This could be mitigated by batching queries, using more sophisticated indexing strategies like HNSW parameter tuning, or partitioning the data. Furthermore, the system relies on “fire-and-forget” notifications via SNS. For mission-critical alerts, a more robust mechanism involving delivery receipts and a persistent notification status tracker would be necessary. The current implementation also processes documents one by one; a future iteration could leverage batching at the SQS consumer level to process multiple documents in a single function invocation, improving throughput and reducing cost.


  TOC