The core technical pain point originated from a seemingly positive problem: our technical documentation, built with a Static Site Generator (SSG), had become exceptionally comprehensive. This success, however, rendered our search functionality, based on simple keyword matching, increasingly ineffective. Developers were struggling to find nuanced solutions buried in thousands of markdown files. The obvious path forward was semantic search, leveraging a Retrieval-Augmented Generation (RAG) model. Yet, this created a significant architectural conflict. The beauty of an SSG is its simplicity and security—a collection of static files served from a CDN. Introducing a dynamic, AI-driven process threatened this paradigm.
Our initial attempt was naive and disastrous. We tried to integrate embedding generation directly into the SSG’s build pipeline. During CI, a script would scan all markdown files, generate embeddings using a sentence-transformer model, and push them to a vector database. This approach failed on multiple fronts. Build times skyrocketed from three minutes to over thirty, killing developer productivity. The CI runner required long-lived, high-privilege API keys for the vector database, a clear security anti-pattern. Most critically, the process was fragile; a transient network error during the embedding upload would fail the entire documentation deployment. We needed to decouple the content build from the AI indexing process entirely.
This led to the design of a fully asynchronous, event-driven ingestion pipeline. The SSG build process would remain lean and fast. Upon a successful deployment, it would simply emit an event: “new content is available”. A separate, isolated microservice would then consume this event, retrieve the new content, process it, and update our vector index. This architecture required careful selection of components to ensure resilience, security, and observability.
Our technology selection was driven by production realities. For the eventing backbone, we chose Azure Service Bus. We were already operating within the Azure ecosystem, and its guarantees of message delivery, dead-lettering capabilities, and managed nature made it a superior choice to a simple webhook or a self-managed queue like RabbitMQ. The pitfall of webhooks is their transience; if the consumer service is down, the event is lost. Service Bus ensures the event persists until it’s successfully processed.
For the vector store, Pinecone was selected for its serverless, managed offering. The operational overhead of deploying and scaling our own vector database like Milvus or Weaviate was a non-starter for our small platform team. Pinecone’s simple API and performance characteristics fit our use case perfectly.
Security was non-negotiable. The embedding processor service needed credentials for both Azure Service Bus and Pinecone. Storing these in Kubernetes Secrets was an option, but not ideal due to secret sprawl and static credential rotation challenges. We opted for HashiCorp Vault. By leveraging Vault’s Kubernetes authentication method, our service could dynamically request short-lived credentials at startup, completely eliminating static secrets from our environment.
Finally, as we were introducing a new microservice into our Kubernetes cluster, we needed to ensure communication was secure and observable from day one. Instead of burdening the application code with mTLS logic, retries, and metrics instrumentation, we deployed Linkerd as our service mesh. Its transparent proxy injection provided immediate mTLS, detailed observability, and connection reliability without a single line of application code change. This is a common mistake in microservice adoption—neglecting the operational complexity of inter-service communication until it becomes a production issue.
The final architecture can be visualized as a clear data flow:
graph TD subgraph "CI/CD Pipeline (GitHub Actions)" A[Hugo SSG Build] --> B{Deploy to CDN}; B --> C[Publish 'ContentUpdated' Message]; end subgraph "Azure" C --> D[Azure Service Bus Topic]; end subgraph "Kubernetes Cluster (with Linkerd)" D -- Pulls Message --> E[Embedding Processor Service]; E -- Fetches Secrets --> F[HashiCorp Vault]; E -- Fetches Content --> G[CDN/Storage]; E -- Upserts Vectors --> H[Pinecone]; end style F fill:#f9f,stroke:#333,stroke-width:2px style D fill:#0078D4,stroke:#fff,stroke-width:2px,color:#fff style H fill:#2E9698,stroke:#fff,stroke-width:2px,color:#fff
Phase 1: Provisioning the Infrastructure with Terraform
A robust system starts with reproducible infrastructure. We defined our Azure Service Bus namespace, topic, and a dedicated subscription for the embedding service using Terraform. We also configured a managed identity for our Kubernetes cluster, which would later be used by the embedding service to authenticate with Service Bus without connection strings.
# main.tf
provider "azurerm" {
features {}
}
resource "azurerm_resource_group" "rg" {
name = "rag-infra-rg"
location = "East US"
}
# Create a managed identity for our Kubernetes workload
resource "azurerm_user_assigned_identity" "embed_processor" {
name = "embedding-processor-identity"
resource_group_name = azurerm_resource_group.rg.name
location = azurerm_resource_group.rg.location
}
# Service Bus Namespace
resource "azurerm_servicebus_namespace" "sbus" {
name = "rag-docs-pipeline-sbus"
location = azurerm_resource_group.rg.location
resource_group_name = azurerm_resource_group.rg.name
sku = "Standard" # Standard SKU is required for topics
}
# Service Bus Topic for content updates
resource "azurerm_servicebus_topic" "content_updates" {
name = "content-updates"
namespace_id = azurerm_servicebus_namespace.sbus.id
enable_partitioning = true
}
# Subscription for the embedding processor
resource "azurerm_servicebus_subscription" "embedding_processor_sub" {
name = "embedding-processor-subscription"
topic_id = azurerm_servicebus_topic.content_updates.id
max_delivery_count = 5 # Retry a message 5 times before dead-lettering
# Move message to dead-letter queue after 1 hour if not processed
default_message_ttl = "PT1H"
}
# Grant the managed identity permission to receive messages
resource "azurerm_role_assignment" "sbus_receiver" {
scope = azurerm_servicebus_subscription.embedding_processor_sub.id
role_definition_name = "Azure Service Bus Data Receiver"
principal_id = azurerm_user_assigned_identity.embed_processor.principal_id
}
This configuration establishes the core messaging backbone. The max_delivery_count
is a critical resilience parameter. If our service fails to process a message five consecutive times, Service Bus will automatically move it to a dead-letter queue (DLQ) for manual inspection, preventing a poison pill message from blocking the entire pipeline.
Phase 2: The Secure Embedding Processor Service
We implemented the processor service in Go for its performance and robust concurrency model. The service has three primary responsibilities: authenticate with Vault to get secrets, listen for messages from Service Bus, and process those messages to update Pinecone.
A key design decision was to use the Vault Agent Injector and the Kubernetes Auth method. This pattern allows the application pod to authenticate with Vault using its Kubernetes Service Account Token. Vault then returns a short-lived token that the application can use to read secrets.
Here is the Kubernetes deployment manifest, highlighting the Vault and Linkerd annotations:
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: embedding-processor
namespace: data-pipelines
labels:
app: embedding-processor
spec:
replicas: 1
selector:
matchLabels:
app: embedding-processor
template:
metadata:
labels:
app: embedding-processor
annotations:
# Linkerd annotation to inject the proxy sidecar
linkerd.io/inject: enabled
# Vault Agent Injector annotations
vault.hashicorp.com/agent-inject: 'true'
vault.hashicorp.com/role: 'embedding-processor-role'
vault.hashicorp.com/agent-inject-secret-pinecone: 'secret/data/pinecone'
vault.hashicorp.com/agent-inject-template-pinecone: |
{{- with secret "secret/data/pinecone" -}}
export PINECONE_API_KEY="{{ .Data.data.api_key }}"
export PINECONE_ENVIRONMENT="{{ .Data.data.environment }}"
{{- end }}
spec:
serviceAccountName: embedding-processor-sa
containers:
- name: processor
image: myregistry.azurecr.io/embedding-processor:v1.2.0
# The Vault agent runs an init container to fetch secrets
# and sources them into the main container's environment.
# We don't need to specify env vars here directly.
command: ["/bin/bash", "-c"]
args:
- "source /vault/secrets/pinecone && /app/processor"
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "1"
memory: "2Gi"
# We assume the user assigned identity is bound to the pod via workload identity
The application code itself doesn’t need any Vault-specific libraries for this pattern. The secrets are mounted as a file, and an init
container sources them as environment variables before the main application process starts. This cleanly separates concerns.
The core Go application logic is structured around a message processing loop.
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/pinecone-io/go-pinecone/pinecone"
"github.com/philippgille/gokv/encoding"
// ... other imports for embedding model, content fetching, etc.
)
const (
serviceBusNamespace = "rag-docs-pipeline-sbus.servicebus.windows.net"
topicName = "content-updates"
subscriptionName = "embedding-processor-subscription"
pineconeIndexName = "documentation-index"
)
// ContentUpdateEvent defines the structure of the message from our CI pipeline
type ContentUpdateEvent struct {
SourceURL string `json:"source_url"` // URL to the raw markdown file
ContentID string `json:"content_id"` // Unique ID, e.g., git commit hash + file path
Timestamp string `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata"`
}
// Processor holds clients and configuration
type Processor struct {
pineconeClient *pinecone.Client
sbusClient *azservicebus.Client
logger *slog.Logger
// embedder EmbeddingProvider // Interface for sentence-transformer
}
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Get secrets from environment, populated by Vault Agent Injector
pineconeAPIKey := os.Getenv("PINECONE_API_KEY")
pineconeEnv := os.Getenv("PINECONE_ENVIRONMENT")
if pineconeAPIKey == "" || pineconeEnv == "" {
logger.Error("Pinecone credentials not found in environment")
os.Exit(1)
}
// 1. Initialize Pinecone Client
pc, err := pinecone.NewClient(pinecone.NewClientParams{
ApiKey: pineconeAPIKey,
Environment: pineconeEnv,
})
if err != nil {
logger.Error("failed to initialize pinecone client", "error", err)
os.Exit(1)
}
// 2. Initialize Azure Service Bus Client using Workload Identity
cred, err := azidentity.NewWorkloadIdentityCredential(nil)
if err != nil {
logger.Error("failed to create workload identity credential", "error", err)
os.Exit(1)
}
sbusClient, err := azservicebus.NewClient(serviceBusNamespace, cred, nil)
if err != nil {
logger.Error("failed to create service bus client", "error", err)
os.Exit(1)
}
processor := &Processor{
pineconeClient: pc,
sbusClient: sbusClient,
logger: logger,
}
logger.Info("starting message processing loop")
processor.startProcessingLoop()
}
func (p *Processor) startProcessingLoop() {
receiver, err := p.sbusClient.NewReceiverForSubscription(topicName, subscriptionName, nil)
if err != nil {
p.logger.Error("failed to create receiver", "error", err)
// This is a fatal error, likely a configuration issue.
// In a real production system, you'd have a restart policy (e.g., Kubernetes).
panic(err)
}
defer receiver.Close(context.Background())
for {
// Receive messages in a loop. ReceiveMessages blocks until messages are available or context is cancelled.
messages, err := receiver.ReceiveMessages(context.Background(), 10, nil) // Process up to 10 messages at a time
if err != nil {
p.logger.Error("error receiving messages", "error", err)
time.Sleep(5 * time.Second) // Backoff before retrying
continue
}
for _, msg := range messages {
p.processMessage(context.Background(), receiver, msg)
}
}
}
func (p *Processor) processMessage(ctx context.Context, receiver *azservicebus.Receiver, msg *azservicebus.ReceivedMessage) {
var event ContentUpdateEvent
if err := json.Unmarshal(msg.Body, &event); err != nil {
p.logger.Error("failed to unmarshal message body", "error", err, "messageId", msg.MessageID)
// This message is malformed. Dead-letter it immediately so it doesn't get retried.
_ = receiver.DeadLetterMessage(ctx, msg, &azservicebus.DeadLetterOptions{
ErrorDescription: &[]string{"JSON Unmarshal Error"}[0],
})
return
}
p.logger.Info("processing content update", "contentId", event.ContentID, "sourceUrl", event.SourceURL)
// In a real implementation:
// 1. Fetch content from event.SourceURL
// 2. Chunk the content into manageable pieces for embedding
// 3. Generate embeddings for each chunk using a model
// 4. Create Pinecone vectors with embeddings and metadata
// Example vector creation (mocked embedding)
vectors := []*pinecone.Vector{
{
ID: fmt.Sprintf("%s-chunk-0", event.ContentID),
Values: []float32{0.1, 0.2, 0.3, /* ... */, 0.9}, // Replace with actual embedding
Metadata: encoding.MapMarshaler(event.Metadata).
Set("source_url", event.SourceURL).
Set("content_id", event.ContentID),
},
}
iv, err := p.pineconeClient.Index(pineconeIndexName)
if err != nil {
p.logger.Error("failed to get pinecone index view", "error", err, "messageId", msg.MessageID)
// This could be a transient network issue. We abandon the message, letting Service Bus redeliver it.
_ = receiver.AbandonMessage(ctx, msg, nil)
return
}
// Upsert vectors in batches. Pinecone has limits on batch size.
_, err = iv.Upsert(vectors, "") // Namespace can be used for multi-tenancy
if err != nil {
p.logger.Error("failed to upsert vectors to pinecone", "error", err, "messageId", msg.MessageID)
// Again, likely transient. Abandon for retry.
_ = receiver.AbandonMessage(ctx, msg, nil)
return
}
// If everything succeeds, complete the message to remove it from the queue.
if err := receiver.CompleteMessage(ctx, msg, nil); err != nil {
p.logger.Error("failed to complete message", "error", err, "messageId", msg.MessageID)
} else {
p.logger.Info("successfully processed message", "contentId", event.ContentID)
}
}
This code demonstrates a key principle of robust message consumers: explicit message disposition.
- CompleteMessage: The message was processed successfully and should be deleted.
- AbandonMessage: An error occurred (likely transient), and the message should be re-queued for another attempt.
- DeadLetterMessage: The message is invalid (e.g., malformed JSON) and should be moved to the DLQ immediately without retries.
Phase 3: Triggering from the CI Pipeline
The final piece of the puzzle is triggering this pipeline. We added a step to our GitHub Actions workflow that runs after the SSG content has been successfully deployed to its CDN/storage location. This step uses the Azure CLI to publish a message to the Service Bus topic.
# .github/workflows/deploy-docs.yml
name: Deploy Documentation
on:
push:
branches:
- main
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
# Steps for setting up Hugo, building the site...
- name: Build static site
run: hugo --minify
# Steps for deploying the './public' directory to Azure Static Web Apps, Blob Storage, etc.
- name: Deploy to CDN
# ... deployment command ...
# Login to Azure using OIDC - no stored secrets!
- name: 'Az CLI login'
uses: azure/login@v1
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Notify Embedding Service
run: |
# We construct a message for each modified markdown file.
# A more sophisticated approach would get the list of changed files from git.
# For this example, we'll send a single message representing the entire build.
COMMIT_SHA=$(git rev-parse --short HEAD)
TIMESTAMP=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
MESSAGE_BODY=$(jq -n \
--arg source "https://docs.example.com/sitemap.xml" \
--arg id "build-${COMMIT_SHA}" \
--arg ts "${TIMESTAMP}" \
'{source_url: $source, content_id: $id, timestamp: $ts, metadata: {build_trigger: "main-push"}}')
az servicebus topic message send \
--namespace-name "rag-docs-pipeline-sbus" \
--topic-name "content-updates" \
--message "${MESSAGE_BODY}"
This closes the loop. The CI pipeline remains fast, as sending a message is a quick, fire-and-forget operation. The responsibility of indexing is now fully owned by the backend service.
Phase 4: The Role of the Service Mesh
With the service running in Kubernetes, Linkerd’s role becomes apparent. Out of the box, all TCP communication to and from the embedding-processor
pod is now wrapped in mTLS. Any communication between this service and a potential future “search API” service would be automatically encrypted and authenticated without any certificates to manage in our application code.
Furthermore, Linkerd provides immediate observability. By running linkerd viz stat deploy/embedding-processor -n data-pipelines
, we get golden signals (success rate, requests/sec, latency) for all inbound and outbound connections. We discovered an issue where our initial content fetching logic had no timeouts, leading to pods hanging on slow CDN responses. We used a Linkerd ServiceProfile
to declaratively add a 5-second timeout and automatic retries on failures, again, without changing the application.
# linkerd-serviceprofile.yaml
apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
metadata:
name: cdn-storage.default.svc.cluster.local # FQDN of the internal service/gateway for the CDN
namespace: data-pipelines
spec:
routes:
- name: 'GET /docs/.*'
condition:
method: GET
pathRegex: /docs/.*
isRetryable: true # Allows Linkerd to safely retry failed requests
timeout: 5s # Enforces a strict timeout
This is the power of a service mesh: separating operational concerns (security, reliability, observability) from business logic. The Go application can focus solely on its core task of processing embeddings.
While this architecture provides a resilient and secure foundation, it is not without limitations. The current implementation only handles content creation and updates. A comprehensive solution must address deletions. When a markdown file is deleted from the repository, its corresponding vectors must be removed from Pinecone to prevent stale search results. This could be implemented by having the CI pipeline send a ContentDeleted
event with the content_id
, which a handler in our service would use to issue a delete request to Pinecone.
Additionally, the cost of generating embeddings for every change on a massive documentation site can be substantial. A future optimization would be to implement a more granular update mechanism. Instead of sending a single “build complete” message, the CI pipeline could determine the exact set of changed files (e.g., via git diff
) and emit individual update events for each, preventing unnecessary re-indexing of unchanged content. This moves the system from a batch-update model to a more efficient, streaming-update paradigm.