Offloading LLM Embedding Generation to a Go gRPC Service for a High-Throughput Azure Functions RAG Pipeline


The initial proof-of-concept for our document ingestion pipeline was deceptively simple. An Azure Function, triggered by a new blob upload, would read a document, chunk it, call an OpenAI-compatible API to generate embeddings, and finally upsert the resulting vectors into a Qdrant collection. For a dozen documents, it worked. When faced with ingesting a backlog of 200,000 internal memos, the architecture crumbled. Azure Function executions timed out after ten minutes, costs ballooned due to prolonged execution times, and the overall throughput was abysmal. The bottleneck was glaring: embedding generation.

Making repeated, synchronous HTTP calls for each text chunk from within a serverless function is fundamentally inefficient. The process is I/O-bound and compute-intensive, a poor fit for a consumption-plan function designed for short, bursty workloads. A common mistake is to simply increase the function’s timeout and memory, which is a brute-force solution that papers over the architectural flaw rather than fixing it. The real-world fix required isolating the bottleneck and building a specialized component to handle it efficiently.

Our revised approach was to offload the entire embedding workload to a dedicated, high-performance service. This service would be long-running, stateful enough to manage connection pools and potential caches, and optimized for batch processing. Communication between the orchestrating Azure Function and this new service needed to be extremely low-latency. This immediately ruled out another RESTful API. The overhead of JSON serialization and HTTP/1.1 for this internal, high-frequency task was unnecessary. The choice was clear: a Go-based gRPC service. Go’s concurrency primitives and performance profile are perfect for this kind of I/O-heavy, parallelizable task, and gRPC’s use of Protobuf and HTTP/2 provides the performance characteristics required.

The entire distributed system—the serverless function, the containerized gRPC service, and the Qdrant vector database—needed to be managed as a single, cohesive unit. Scripting deployments with a mix of az cli, docker, and func core tools would create a fragile, unmaintainable mess. This is a classic scenario where Infrastructure as Code becomes non-negotiable. We chose Pulumi with TypeScript to define all resources, enabling us to manage dependencies and configuration programmatically across different service types.

Defining the High-Throughput Contract with Protobuf

The foundation of the new architecture is the communication contract between the Azure Function (client) and the embedding service (server). A bidirectional streaming gRPC endpoint is the most efficient pattern for this. It allows the client to stream document chunks to the server as they are processed, and the server can process them in batches without waiting for the entire stream to finish. This creates a pipeline effect that maximizes throughput.

The pitfall here is designing a naive unary RPC that accepts a list of strings. This forces the client to buffer all chunks in memory before making a single large request, which can lead to memory pressure in the serverless function and is less resilient to transient network issues. Streaming is the correct approach.

Here is the Protobuf definition that establishes this contract.

// /proto/embedder.proto

syntax = "proto3";

package embedder;

option go_package = "github.com/my-org/my-project/gen/go/embedder";

// The core service for generating text embeddings.
service Embedder {
  // Process a stream of documents, generating embeddings in batches.
  // This is a bidirectional streaming RPC for maximum throughput.
  rpc ProcessDocumentStream(stream DocumentChunk) returns (stream EmbeddingResult);
}

// Represents a single chunk of text from a larger document.
message DocumentChunk {
  // A unique identifier for the original document.
  string document_id = 1;
  // The sequence number of this chunk within the document.
  uint32 chunk_index = 2;
  // The actual text content of the chunk.
  string content = 3;
}

// Represents the result of an embedding operation for a single chunk.
message EmbeddingResult {
  string document_id = 1;
  uint32 chunk_index = 2;

  oneof result {
    EmbeddingVector vector = 3;
    ProcessingError error = 4;
  }
}

message EmbeddingVector {
  // The generated embedding vector.
  repeated float points = 1;
}

// Represents an error that occurred during processing a specific chunk.
message ProcessingError {
  string message = 1;
  // An optional error code for more granular error handling.
  uint32 code = 2;
}

This .proto file is the source of truth. The ProcessDocumentStream RPC defines the bidirectional stream. The client sends DocumentChunk messages, and the server responds with EmbeddingResult messages. Crucially, the EmbeddingResult uses a oneof field to communicate either success (EmbeddingVector) or failure (ProcessingError) on a per-chunk basis. This is vital for production systems; a single failed embedding should not torpedo an entire batch of thousands.

Implementing the Go gRPC Embedding Service

The Go service is the workhorse. It needs to be robust, configurable, and concurrent. Its primary job is to listen for incoming gRPC streams, collect chunks into optimal batch sizes for the embedding model, dispatch requests to the model API, and stream results back.

A common mistake when implementing such a service is to process chunks one-by-one as they arrive on the stream. This completely negates the performance benefits of batching that modern embedding models offer. The correct implementation requires a buffer-and-dispatch mechanism.

Here is the core server implementation.

// /embedder-service/main.go

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log/slog"
	"net"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	pb "github.com/my-org/my-project/gen/go/embedder"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"github.com/sashabaranov/go-openai"
)

const (
	// In a real-world project, these would come from config.
	defaultGrpcPort   = "50051"
	maxBatchSize      = 128 // Optimal batch size can depend on the model and hardware.
	batchTimeout      = 500 * time.Millisecond
	embeddingModel    = openai.AdaEmbeddingV2
	embeddingDimensions = 1536 // Specific to text-embedding-ada-002
)

type embedderServer struct {
	pb.UnimplementedEmbedderServer
	openaiClient *openai.Client
	logger       *slog.Logger
}

func newServer(logger *slog.Logger) (*embedderServer, error) {
	apiKey := os.Getenv("OPENAI_API_KEY")
	if apiKey == "" {
		return nil, errors.New("OPENAI_API_KEY environment variable not set")
	}
	client := openai.NewClient(apiKey)

	return &embedderServer{
		openaiClient: client,
		logger:       logger,
	}, nil
}

// ProcessDocumentStream is the bidirectional streaming RPC implementation.
func (s *embedderServer) ProcessDocumentStream(stream pb.Embedder_ProcessDocumentStreamServer) error {
	s.logger.Info("Client connected to stream")
	
	// Use a WaitGroup to ensure all goroutines finish before the function returns.
	var wg sync.WaitGroup
	
	// Channel to hold incoming chunks from the client stream.
	chunkChan := make(chan *pb.DocumentChunk, maxBatchSize*2)

	// Goroutine to receive chunks from the client.
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer close(chunkChan)
		for {
			chunk, err := stream.Recv()
			if err == io.EOF {
				s.logger.Info("Client finished sending chunks")
				return
			}
			if err != nil {
				s.logger.Error("Error receiving from stream", "error", err)
				return
			}
			chunkChan <- chunk
		}
	}()

	// Goroutine to process batches and send results back.
	wg.Add(1)
	go func() {
		defer wg.Done()
		batch := make([]*pb.DocumentChunk, 0, maxBatchSize)
		ticker := time.NewTicker(batchTimeout)
		defer ticker.Stop()

		for {
			select {
			case chunk, ok := <-chunkChan:
				if !ok { // Channel is closed
					if len(batch) > 0 {
						s.processAndSendBatch(stream, batch)
					}
					return
				}
				batch = append(batch, chunk)
				if len(batch) >= maxBatchSize {
					s.processAndSendBatch(stream, batch)
					batch = make([]*pb.DocumentChunk, 0, maxBatchSize) // Reset batch
				}
			case <-ticker.C:
				if len(batch) > 0 {
					s.processAndSendBatch(stream, batch)
					batch = make([]*pb.DocumentChunk, 0, maxBatchSize) // Reset batch
				}
			}
		}
	}()

	wg.Wait()
	s.logger.Info("Stream processing complete for this client")
	return nil
}

// processAndSendBatch handles the actual embedding generation and sending results.
func (s *embedderServer) processAndSendBatch(stream pb.Embedder_ProcessDocumentStreamServer, batch []*pb.DocumentChunk) {
	if len(batch) == 0 {
		return
	}

	s.logger.Info("Processing batch", "size", len(batch))
	
	texts := make([]string, len(batch))
	for i, chunk := range batch {
		texts[i] = chunk.Content
	}

	req := openai.EmbeddingRequest{
		Input: texts,
		Model: embeddingModel,
	}

	resp, err := s.openaiClient.CreateEmbeddings(context.Background(), req)
	if err != nil {
		s.logger.Error("Failed to create embeddings", "error", err)
		// On total failure, send an error result for every chunk in the batch.
		for _, chunk := range batch {
			errResult := &pb.EmbeddingResult{
				DocumentId: chunk.DocumentId,
				ChunkIndex: chunk.ChunkIndex,
				Result: &pb.EmbeddingResult_Error{
					Error: &pb.ProcessingError{Message: "Upstream API call failed: " + err.Error()},
				},
			}
			if sendErr := stream.Send(errResult); sendErr != nil {
				s.logger.Error("Failed to send error result to client", "error", sendErr)
			}
		}
		return
	}

	if len(resp.Data) != len(batch) {
		s.logger.Error("Mismatch between request and response embedding count", "requested", len(batch), "received", len(resp.Data))
		// Handle this serious error condition. Again, send errors back.
		return // simplified for brevity
	}

	for i, chunk := range batch {
		embeddingData := resp.Data[i]
		points := make([]float32, len(embeddingData.Embedding))
		copy(points, embeddingData.Embedding)

		result := &pb.EmbeddingResult{
			DocumentId: chunk.DocumentId,
			ChunkIndex: chunk.ChunkIndex,
			Result: &pb.EmbeddingResult_Vector{
				Vector: &pb.EmbeddingVector{Points: points},
			},
		}

		if err := stream.Send(result); err != nil {
			s.logger.Error("Failed to send embedding result to client", "error", err)
			// At this point, the connection might be broken.
			return
		}
	}
	s.logger.Info("Successfully sent batch results", "size", len(batch))
}


func main() {
	logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
	grpcPort := os.Getenv("GRPC_PORT")
	if grpcPort == "" {
		grpcPort = defaultGrpcPort
	}
	
	lis, err := net.Listen("tcp", fmt.Sprintf(":%s", grpcPort))
	if err != nil {
		logger.Error("Failed to listen", "error", err)
		os.Exit(1)
	}

	s, err := newServer(logger)
	if err != nil {
		logger.Error("Failed to create server", "error", err)
		os.Exit(1)
	}
	
	grpcServer := grpc.NewServer()
	pb.RegisterEmbedderServer(grpcServer, s)

	logger.Info("gRPC server listening", "address", lis.Addr().String())

	// Graceful shutdown
	go func() {
		if err := grpcServer.Serve(lis); err != nil {
			logger.Error("Failed to serve", "error", err)
		}
	}()

	stop := make(chan os.Signal, 1)
	signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
	<-stop

	logger.Info("Shutting down gRPC server...")
	grpcServer.GracefulStop()
	logger.Info("Server gracefully stopped")
}

This implementation uses a channel (chunkChan) and a timed batching loop. Chunks arriving from the client are pushed into the channel. A separate goroutine reads from this channel, accumulating chunks into a batch. The batch is dispatched to the OpenAI API when it’s full (maxBatchSize) or when a timeout is reached (batchTimeout), ensuring that even straggling chunks get processed. This is a robust pattern for streaming aggregation. Error handling is also critical; if the upstream API call fails, the service sends back explicit ProcessingError messages for each chunk in the failed batch.

Orchestrating with the Azure Function Client

The Azure Function is now demoted from a worker to a lightweight orchestrator. Its responsibilities are reduced to:

  1. Receiving the trigger (e.g., a blob creation event).
  2. Reading and chunking the document.
  3. Establishing a gRPC connection to the Go service.
  4. Streaming the chunks and receiving the embedding results.
  5. Upserting the vectors and metadata to Qdrant.

We’ll use the C#/.NET isolated worker model for the function, as it provides mature gRPC client libraries.

// /FunctionApp/IngestDocument.cs

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Grpc.Net.Client;
using Qdrant.Client;
using Qdrant.Client.Grpc;
using Embedder; // Generated from our .proto file

public class IngestDocument
{
    private readonly ILogger _logger;
    private readonly QdrantClient _qdrantClient;
    private readonly string _grpcEmbedderUrl;
    private const string QdrantCollectionName = "internal-memos";

    public IngestDocument(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<IngestDocument>();
        
        // Configuration should be injected properly in a real app.
        var qdrantUrl = Environment.GetEnvironmentVariable("QDRANT_URL") ?? "http://localhost:6333";
        _grpcEmbedderUrl = Environment.GetEnvironmentVariable("GRPC_EMBEDDER_URL") ?? "http://localhost:50051";

        _qdrantClient = new QdrantClient(qdrantUrl);
    }

    [Function("IngestDocument")]
    public async Task Run([BlobTrigger("documents/{name}", Connection = "AzureWebJobsStorage")] Stream myBlob, string name)
    {
        _logger.LogInformation($"C# Blob trigger function processed blob\n Name: {name} \n Size: {myBlob.Length} Bytes");

        // Step 1: Read and chunk the document (logic omitted for brevity)
        var chunks = ChunkDocument(myBlob, name);

        // Step 2: Connect to the gRPC service
        using var channel = GrpcChannel.ForAddress(_grpcEmbedderUrl);
        var client = new Embedder.Embedder.EmbedderClient(channel);
        
        var pointsToUpsert = new List<PointStruct>();

        // Step 3: Use the bidirectional stream
        using var call = client.ProcessDocumentStream();
        
        var responseReaderTask = Task.Run(async () =>
        {
            await foreach (var result in call.ResponseStream.ReadAllAsync())
            {
                if (result.Error != null)
                {
                    _logger.LogError($"Failed to embed chunk {result.ChunkIndex} for doc {result.DocumentId}: {result.Error.Message}");
                    continue;
                }

                var point = new PointStruct
                {
                    Id = new PointId { Uuid = Guid.NewGuid().ToString() },
                    Vectors = new Vectors(result.Vector.Points.ToArray()),
                    Payload =
                    {
                        ["document_id"] = result.DocumentId,
                        ["chunk_index"] = result.ChunkIndex,
                        // In a real implementation, you'd fetch the original text
                        ["source_file"] = name
                    }
                };
                pointsToUpsert.Add(point);
            }
        });

        foreach (var chunk in chunks)
        {
            await call.RequestStream.WriteAsync(new DocumentChunk
            {
                DocumentId = chunk.DocumentId,
                ChunkIndex = (uint)chunk.Index,
                Content = chunk.Content
            });
        }
        await call.RequestStream.CompleteAsync();
        _logger.LogInformation("Finished sending all chunks to gRPC service.");
        
        // Wait for all responses to be processed
        await responseReaderTask;
        _logger.LogInformation($"Received all embedding results. Total points to upsert: {pointsToUpsert.Count}");

        // Step 4: Upsert to Qdrant in batches
        if (pointsToUpsert.Any())
        {
            await _qdrantClient.UpsertPointsAsync(QdrantCollectionName, pointsToUpsert);
            _logger.LogInformation($"Successfully upserted {pointsToUpsert.Count} points to Qdrant.");
        }
    }

    private List<(string DocumentId, int Index, string Content)> ChunkDocument(Stream blob, string name)
    {
        // Placeholder for actual document chunking logic
        // This should split the document into manageable pieces for the LLM
        return new List<(string, int, string)>
        {
            (name, 0, "This is the first part of the document."),
            (name, 1, "This is the second part, continuing the memo.")
        };
    }
}

This function code demonstrates the complete client-side logic. It separates the reading of responses from the writing of requests into a separate Task, which is the standard pattern for handling bidirectional streams in C#. Notice how it gracefully handles per-chunk errors from the embedding service, logging them but continuing with the rest of the batch.

Unifying Deployment with Pulumi

The final piece is deploying this entire stack—Qdrant, the Go gRPC service, and the C# Azure Function—as a single, version-controlled unit. Pulumi’s ability to use a general-purpose programming language is invaluable here because the output of one resource (like the gRPC service’s URL) becomes the input for another (the function’s app settings).

Below is a simplified Pulumi program in TypeScript that defines the architecture.

// /pulumi/index.ts

import * as pulumi from "@pulumi/pulumi";
import * as azure from "@pulumi/azure-native";
import * as docker from "@pulumi/docker";

// Common configuration
const config = new pulumi.Config();
const location = config.get("location") || "westeurope";
const resourceGroupName = new azure.resources.ResourceGroup("rg", { location }).name;
const storageAccount = new azure.storage.StorageAccount("sa", {
    resourceGroupName,
    location,
    sku: { name: "Standard_LRS" },
    kind: "StorageV2",
});

// Part 1: Deploy Qdrant (using Azure Container Instances for simplicity)
const qdrantImage = "qdrant/qdrant:latest";
const qdrantContainerGroup = new azure.containerinstance.ContainerGroup("qdrant-cg", {
    resourceGroupName,
    location,
    osType: "Linux",
    containers: [{
        name: "qdrant",
        image: qdrantImage,
        resources: {
            requests: {
                cpu: 1.0,
                memoryInGB: 2.0,
            },
        },
        ports: [{ port: 6333 }, { port: 6334 }], // HTTP and gRPC ports
    }],
    ipAddress: {
        type: "Public",
        ports: [{ port: 6333, protocol: "TCP" }, { port: 6334, protocol: "TCP" }],
    },
});

const qdrantUrl = pulumi.interpolate`http://${qdrantContainerGroup.ipAddress.apply(ip => ip?.ip)}:6333`;

// Part 2: Build and deploy the Go gRPC embedder service to Azure Container Apps
const managedEnv = new azure.app.ManagedEnvironment("env", {
    resourceGroupName,
    location,
});

// Build and push the Docker image for the Go service
const embedderImage = new docker.Image("embedder-service-image", {
    imageName: pulumi.interpolate`${azure.containerregistry.getRegistry({ resourceGroupName: "my-shared-rg" }).then(r => r.loginServer)}/embedder-service:latest`,
    build: {
        context: "../embedder-service", // Path to the Go service directory
    },
});

const embedderApp = new azure.app.ContainerApp("embedder-app", {
    resourceGroupName,
    managedEnvironmentId: managedEnv.id,
    template: {
        containers: [{
            name: "embedder",
            image: embedderImage.imageName,
            env: [ // Pass configuration as environment variables
                { name: "GRPC_PORT", value: "50051" },
                { name: "OPENAI_API_KEY", secretName: "openai-api-key" },
            ],
            resources: { cpu: 1.0, memory: "2.0Gi" },
        }],
        scale: { minReplicas: 1, maxReplicas: 3 },
        secrets: [{ name: "openai-api-key", value: config.requireSecret("openaiApiKey") }],
    },
    configuration: {
        ingress: {
            external: true, // Publicly accessible for the Function App
            targetPort: 50051,
            transport: "grpc",
        },
    },
});

const grpcEmbedderUrl = pulumi.interpolate`http://${embedderApp.configuration.apply(c => c?.ingress?.fqdn)}`;


// Part 3: Deploy the Azure Function App
const appServicePlan = new azure.web.AppServicePlan("plan", {
    resourceGroupName,
    location,
    sku: {
        name: "Y1", // Dynamic plan for serverless functions
        tier: "Dynamic",
    },
});

const functionApp = new azure.web.WebApp("function-app", {
    resourceGroupName,
    location,
    serverFarmId: appServicePlan.id,
    kind: "functionapp",
    siteConfig: {
        appSettings: [
            { name: "AzureWebJobsStorage", value: storageAccount.primaryConnectionString },
            { name: "FUNCTIONS_WORKER_RUNTIME", value: "dotnet-isolated" },
            { name: "WEBSITE_RUN_FROM_PACKAGE", value: "1" }, // Assuming package deployment
            // This is the critical connection!
            { name: "QDRANT_URL", value: qdrantUrl },
            { name: "GRPC_EMBEDDER_URL", value: grpcEmbedderUrl },
        ],
    },
});

// Export the function app's hostname for easy access
export const functionAppName = functionApp.defaultHostName;

This Pulumi code declaratively defines all necessary components. The key benefit is visible in the appSettings of the WebApp. The values for QDRANT_URL and GRPC_EMBEDDER_URL are not hardcoded strings; they are Output properties derived from the other resources being created in the same program. Pulumi understands this dependency graph and ensures resources are created in the correct order, injecting the dynamic FQDN of the gRPC service into the function app’s configuration automatically. This is something that is extremely difficult and error-prone to manage with shell scripts.

graph TD
    A[Blob Storage] -- Trigger --> B(Azure Function);
    B -- gRPC Stream --> C{Go Embedding Service};
    C -- Batch REST API --> D[LLM Embedding Model];
    B -- Upsert Vectors --> E[Qdrant DB];
    subgraph Pulumi-Managed Infrastructure
        subgraph Azure Container App
            C
        end
        subgraph Azure Function App
            B
        end
        subgraph Azure Container Instance
            E
        end
    end

By offloading the intensive work to a specialized gRPC service, the architecture is now far more scalable and performant. The Azure Function remains lean and fast, serving only as an orchestrator. The Go service can be scaled independently based on the embedding workload, and the entire system is reproducible and manageable thanks to Infrastructure as Code. The initial prototype’s failures were not due to a bad choice in any single technology, but a misunderstanding of how to combine them effectively for a high-throughput workload.

The current solution, however, still has room for improvement. The connection between the Azure Function and the Container App is over the public internet; for enhanced security, both should be placed within a virtual network, which would require modifications to the Pulumi stack to manage VNet integration and private endpoints. The Go service’s batch processing could be further parallelized by making concurrent calls to the embedding API for multiple sub-batches. Finally, the Qdrant deployment on a single ACI container is not production-ready; a proper setup would involve a managed Qdrant Cloud instance or a self-hosted cluster on AKS for high availability and persistence, representing the next logical evolution of this infrastructure.


  TOC