The initial user complaints were consistent: “I upload a document and have to wait half a minute before I can search for it.” Our knowledge base platform, built as a monolithic service, handled document ingestion synchronously. A user POSTed a file to an API endpoint, and the server would chunk the text, call an embedding model, and write the resulting vectors to the database before returning a 200 OK
. In a production environment, this was untenable. The process was slow, prone to timeouts, and a single failure in the embedding pipeline would fail the entire request, forcing the user to retry. The system couldn’t scale.
Our first whiteboard session concluded with a core principle: ingestion must be decoupled from indexing. This immediately pointed towards a message queue. We needed an asynchronous pipeline where the user-facing API could accept a document and immediately return a 202 Accepted
, trusting a backend system to handle the computationally expensive work.
Technology selection became the next critical step. For the message broker, Apache Pulsar was chosen over more common alternatives like Kafka or RabbitMQ. In a real-world project, this wasn’t an arbitrary choice. We anticipated expanding our platform to support different customer tiers and document types. Pulsar’s native multi-tenancy would allow us to logically isolate these workloads without spinning up new clusters. Furthermore, its tiered storage feature, allowing messages to be offloaded to cheaper object storage like S3 after a certain period, was a massive operational win for cost management and data replayability.
The system architecture crystallized into several distinct services:
-
ingestion-api
: A lightweight Go service sitting behind our API Gateway, responsible for validating uploads and publishing them to Pulsar. -
embedding-consumer
: A Go worker that consumes from Pulsar, generates vectors, and writes to our vector database. -
query-service
: The existing search endpoint, now simplified to only query the vector database. -
pwa-frontend
: The user-facing Progressive Web App, which needed a way to get real-time feedback on indexing status.
This distributed nature made a polyrepo structure a pragmatic choice. Our frontend team works in TypeScript and our backend teams in Go. Forcing them into a single monorepo would introduce tooling complexity and cross-team friction we couldn’t afford. Each service gets its own repository, its own CI/CD pipeline, and its own deployment lifecycle. The trade-off, of course, is the added complexity of managing dependencies and coordinating cross-service changes, a problem we accepted and managed with strict API versioning and shared Protobuf definitions.
The initial task was defining the contract between services. We used Protocol Buffers to define the event payload that would travel through Pulsar. This ensures type safety and backward compatibility.
proto/ingestion/v1/event.proto
syntax = "proto3";
package ingestion.v1;
option go_package = "github.com/your-org/your-project/gen/go/ingestion/v1;ingestionv1";
import "google/protobuf/timestamp.proto";
// Message published to Pulsar when a new document is ingested.
message DocumentIngestedEvent {
// A unique identifier for this ingestion event.
string event_id = 1;
// The tenant or user who owns this document.
string tenant_id = 2;
// The unique identifier for the document itself.
string document_id = 3;
// The source location of the raw document content (e.g., an S3 URI).
string source_uri = 4;
// The original filename provided by the user.
string original_filename = 5;
// Timestamp when the ingestion event was created.
google.protobuf.Timestamp ingested_at = 6;
}
This schema is intentionally lean. A common mistake is to put the entire document content into the message queue. This bloats the broker and is inefficient. Instead, the ingestion-api
first uploads the raw file to object storage (like S3) and then publishes this event containing only the pointer (source_uri
).
Our API Gateway, managed via declarative configuration, was set up to route traffic. We used KrakenD for its performance, but any modern gateway would suffice. The key was to have a single entry point for the PWA.
gateway/krakend.json
snippet:
{
"endpoint": "/api/v1/documents",
"method": "POST",
"backend": [
{
"url_pattern": "/ingest",
"host": [ "http://ingestion-api:8080" ],
"method": "POST"
}
],
"extra_config": {
"auth/validator": {
"alg": "RS256",
"jwk_url": "https://your-auth-provider/.well-known/jwks.json",
"roles": [ "user" ],
"prop": "sub"
}
}
}
This configuration ensures that only authenticated users can access the ingestion endpoint and forwards valid requests to our ingestion-api
service.
The core of the ingestion-api
is its HTTP handler and Pulsar producer. The service is written in Go for its performance and concurrency model.
ingestion-api/cmd/server/main.go
package main
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
ingestionv1 "github.com/your-org/your-project/gen/go/ingestion/v1"
)
// Config holds the application configuration.
type Config struct {
PulsarURL string
TopicName string
Port string
}
// IngestionServer handles the HTTP requests and produces to Pulsar.
type IngestionServer struct {
producer pulsar.Producer
logger *slog.Logger
// In a real app, this would be an interface to an object storage client.
// storageClient StorageUploader
}
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
cfg := Config{
PulsarURL: os.Getenv("PULSAR_URL"),
TopicName: os.Getenv("PULSAR_TOPIC"),
Port: os.Getenv("PORT"),
}
if cfg.Port == "" {
cfg.Port = "8080"
}
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: cfg.PulsarURL,
ConnectionTimeout: 30 * time.Second,
OperationTimeout: 30 * time.Second,
})
if err != nil {
logger.Error("Could not instantiate Pulsar client", "error", err)
os.Exit(1)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: cfg.TopicName,
})
if err != nil {
logger.Error("Could not create Pulsar producer", "error", err)
os.Exit(1)
}
defer producer.Close()
server := &IngestionServer{
producer: producer,
logger: logger,
}
mux := http.NewServeMux()
mux.HandleFunc("/ingest", server.handleIngest)
httpServer := &http.Server{
Addr: ":" + cfg.Port,
Handler: mux,
}
// Graceful shutdown
go func() {
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("HTTP server error", "error", err)
}
}()
logger.Info("Server started", "port", cfg.Port)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Info("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
logger.Error("Server shutdown failed", "error", err)
}
logger.Info("Server gracefully stopped")
}
func (s *IngestionServer) handleIngest(w http.ResponseWriter, r *http.Request) {
// In a production system, you'd handle multipart/form-data here,
// extract the file, and upload it to object storage.
// For this example, we simulate that step.
// 1. Get user/tenant ID from JWT (handled by API gateway, passed in header)
tenantID := r.Header.Get("X-User-ID")
if tenantID == "" {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// 2. Pretend to upload to S3 and get a URI
documentID := uuid.New().String()
sourceURI := fmt.Sprintf("s3://your-bucket/documents/%s/%s.pdf", tenantID, documentID)
// 3. Create the event payload
event := &ingestionv1.DocumentIngestedEvent{
EventId: uuid.New().String(),
TenantId: tenantID,
DocumentId: documentID,
SourceUri: sourceURI,
OriginalFilename: "example.pdf", // From form data
IngestedAt: timestamppb.Now(),
}
payload, err := proto.Marshal(event)
if err != nil {
s.logger.Error("Failed to marshal protobuf event", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
// 4. Publish to Pulsar
msgID, err := s.producer.Send(r.Context(), &pulsar.ProducerMessage{
Payload: payload,
Key: documentID, // Use documentID as key for partitioning
})
if err != nil {
s.logger.Error("Failed to publish message to Pulsar", "error", err)
http.Error(w, "Failed to process request", http.StatusServiceUnavailable)
return
}
s.logger.Info("Published message to Pulsar", "msgId", msgID, "documentId", documentID)
// 5. Return 202 Accepted
w.WriteHeader(http.StatusAccepted)
fmt.Fprintf(w, `{"status": "processing", "documentId": "%s"}`, documentID)
}
The key here is returning 202 Accepted
. The client knows the request was received but not yet completed. The response includes a documentId
that the PWA can use to track the status.
Now for the embedding-consumer
. This is a long-running worker process that forms the backbone of the asynchronous pipeline. Its sole job is to listen for DocumentIngestedEvent
messages, process them, and acknowledge them. The pitfall here is improper error handling and acknowledgment. A common mistake is to acknowledge the message as soon as it’s received. If the process crashes during vector generation or DB insertion, the message is lost forever. The correct pattern is to acknowledge the message only after the entire unit of work is successfully completed.
graph TD A[Receive Message from Pulsar] --> B{Fetch Document from S3}; B --> C{Chunk and Clean Text}; C --> D{Call Embedding Model API}; D --> E{Connect to Vector DB}; E --> F{Insert Vectors}; F --> G[Acknowledge Pulsar Message]; subgraph Error Handling D -- On Failure --> H{Retry with backoff}; H -- Max Retries Reached --> I[Send to Dead-Letter Queue]; F -- On Failure --> J{Retry Transaction}; J -- Max Retries Reached --> I; end I --> K[Manual Intervention / Alerting];
This flow diagram illustrates the robust processing logic required.
The implementation of the consumer requires careful management of its lifecycle and concurrency.
embedding-consumer/cmd/worker/main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"google.golang.org/protobuf/proto"
ingestionv1 "github.com/your-org/your-project/gen/go/ingestion/v1"
// These would be your actual client packages
// "github.com/your-org/your-project/internal/embedding"
// "github.comcom/your-org/your-project/internal/storage"
// "github.com/your-org/your-project/internal/vectordb"
)
// Mock clients for demonstration
type mockEmbeddingClient struct{}
func (c *mockEmbeddingClient) CreateEmbeddings(ctx context.Context, text string) ([]float32, error) {
slog.Info("Generating mock embeddings...")
time.Sleep(500 * time.Millisecond) // Simulate network latency
return make([]float32, 768), nil
}
type mockVectorDBClient struct{}
func (c *mockVectorDBClient) Insert(ctx context.Context, id string, vector []float32) error {
slog.Info("Inserting vector into DB", "id", id)
time.Sleep(100 * time.Millisecond) // Simulate DB write
return nil
}
type Worker struct {
consumer pulsar.Consumer
logger *slog.Logger
embedClient *mockEmbeddingClient // Replace with actual client
vectorDBClient *mockVectorDBClient // Replace with actual client
}
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
pulsarURL := os.Getenv("PULSAR_URL")
topicName := os.Getenv("PULSAR_TOPIC")
subscriptionName := "embedding-worker-subscription"
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarURL})
if err != nil {
logger.Error("Could not create Pulsar client", "error", err)
os.Exit(1)
}
defer client.Close()
// Configure Dead Letter Queue (DLQ) for failed messages
dlqPolicy := &pulsar.DLQPolicy{
MaxDeliveries: 5, // Retry a message up to 5 times
Topic: topicName + "-dlq",
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topicName,
SubscriptionName: subscriptionName,
Type: pulsar.Shared, // Allow multiple worker instances
NackRedeliveryDelay: 10 * time.Second,
DLQ: dlqPolicy,
})
if err != nil {
logger.Error("Could not create Pulsar consumer", "error", err)
os.Exit(1)
}
defer consumer.Close()
worker := &Worker{
consumer: consumer,
logger: logger,
embedClient: &mockEmbeddingClient{},
vectorDBClient: &mockVectorDBClient{},
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
logger.Info("Worker started. Waiting for messages...")
worker.Run(ctx)
logger.Info("Worker stopped.")
}
func (w *Worker) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
w.logger.Info("Context cancelled, stopping worker loop.")
return
default:
msg, err := w.consumer.Receive(ctx)
if err != nil {
if err != context.Canceled {
w.logger.Error("Failed to receive message", "error", err)
}
continue // Or break, depending on desired behavior
}
if err := w.processMessage(ctx, msg); err != nil {
w.logger.Error("Failed to process message, nacking", "msgId", msg.ID(), "error", err)
w.consumer.Nack(msg)
} else {
w.logger.Info("Successfully processed message, acking", "msgId", msg.ID())
w.consumer.Ack(msg)
}
}
}
}
func (w *Worker) processMessage(ctx context.Context, msg pulsar.Message) error {
w.logger.Info("Received message", "msgId", msg.ID(), "redeliveryCount", msg.RedeliveryCount())
var event ingestionv1.DocumentIngestedEvent
if err := proto.Unmarshal(msg.Payload(), &event); err != nil {
// This is a poison pill, an unparseable message.
// It will be sent to the DLQ after max retries.
return fmt.Errorf("failed to unmarshal protobuf payload: %w", err)
}
// In a real application, you would download from event.SourceUri
// For now, we simulate with some dummy text.
documentContent := "This is the content of the document " + event.DocumentId
// 1. Generate Embeddings
vector, err := w.embedClient.CreateEmbeddings(ctx, documentContent)
if err != nil {
// This error is likely transient (e.g., embedding service is down).
// Nacking will cause Pulsar to redeliver it after a delay.
return fmt.Errorf("failed to create embeddings for doc %s: %w", event.DocumentId, err)
}
// 2. Insert into Vector DB
if err := w.vectorDBClient.Insert(ctx, event.DocumentId, vector); err != nil {
// This could also be a transient DB error. Nack to retry.
return fmt.Errorf("failed to insert vector for doc %s: %w", event.DocumentId, err)
}
// Only if all steps succeed, we return nil, leading to an ACK.
// We'd also publish a "DocumentIndexed" event here for the PWA.
return nil
}
Notice the DLQPolicy
. If a message fails processing MaxDeliveries
times, Pulsar automatically sends it to a Dead-Letter Queue topic. This prevents a single “poison pill” message from halting the entire pipeline. An operator can then inspect the DLQ to debug the failing message. The subscription type is Shared
, meaning multiple instances of this worker can run concurrently, pulling messages from the same subscription and scaling out our processing capacity.
The final piece was closing the loop for the PWA. Polling an API endpoint for status is inefficient. A better, event-driven approach was needed. When the embedding-consumer
successfully processes a document, it publishes a new, simpler event, DocumentIndexedEvent
, to a different Pulsar topic. A small notification-service
subscribes to this topic. Its only job is to maintain WebSocket connections (proxied through the API Gateway) to active PWA clients and push a notification down the correct socket when an event for their documentId
is received. This provides immediate feedback to the user without the frontend having to poll constantly.
The result of this architectural shift was transformative. The initial synchronous endpoint that took 30+ seconds was replaced by an API call that returned 202 Accepted
in under 150ms. Documents became fully searchable within 2-4 seconds of upload, the variance depending on the load of the embedding model. The system is resilient; an outage in the embedding service or vector database no longer causes user-facing errors. Messages simply queue in Pulsar until the downstream service recovers, at which point processing resumes automatically. The polyrepo structure allows our teams to deploy updates to the ingestion API, the consumer, and the PWA independently, drastically improving our development velocity.
This architecture is not without its own set of challenges. The operational complexity is higher. We now have more services to monitor and deploy. Distributed tracing becomes a necessity, not a luxury, to debug a request’s journey from the API Gateway through Pulsar to the final consumer. The polyrepo structure, while granting team autonomy, requires disciplined dependency management and communication protocols for breaking API changes. Furthermore, while Pulsar consumers can handle backpressure, a sustained, massive influx of documents could still saturate the embedding models, requiring an autoscaling solution for the consumer group, adding another layer of infrastructure management. The system is more robust and scalable, but it demands a more mature operational and DevOps mindset to maintain effectively.