Implementing an Observable and Continuously Deployed RAG Service on Azure Functions with Nginx and Fluentd


The initial RAG prototype was a monolithic Flask application running on a single VM. It worked for the demo, but production readiness was a distant dream. Updating the knowledge base required taking the entire service down, manually running an indexing script, and restarting the application. Scaling was a manual process of cloning the VM. Debugging involved ssh and grep across scattered log files. This approach was untenable; it was fragile, expensive to maintain, and completely opaque. Our mandate was to re-architect this into a scalable, observable system with zero-downtime updates, built on a serverless paradigm to manage costs effectively.

The core of the new architecture decouples the state (the vector index) from the compute (the query engine). This immediately pointed towards a serverless function for compute and a cloud storage solution for the index. We chose Azure Functions for its pay-per-use model and seamless integration with other Azure services. The immediate challenge, however, is managing the LlamaIndex vector store. A non-trivial index can be several gigabytes, far too large to bundle with the function deployment package.

Our solution was to store the serialized index in Azure Blob Storage. The Azure Function’s primary responsibility on startup is to download this index from a designated blob container and load it into memory. In a serverless environment, this introduces the cold start problem: the first request to an idle function will incur the latency of both instance spin-up and the multi-gigabyte index download and deserialization.

To mitigate this, we structured the function to load the index into a process-global variable. This ensures that for warm instances, the index is already in memory, dramatically reducing latency for subsequent requests. This is a critical pattern for stateful operations in a stateless environment.

Here is the core logic of the Azure Function (QueryFunction/__init__.py). It’s designed to be robust, with explicit error handling and structured logging from the outset.

# QueryFunction/__init__.py
import logging
import os
import time
import json
import azure.functions as func
from azure.storage.blob import BlobServiceClient
from llama_index.core import StorageContext, load_index_from_storage, VectorStoreIndex

# --- Global State Management ---
# In a serverless environment, global variables are preserved between invocations on the same "warm" instance.
# This is a crucial optimization to avoid reloading the large index on every single request.
llm_index: VectorStoreIndex = None
index_version: str = ""

# --- Constants ---
# Configuration is driven by environment variables, a standard practice for cloud-native apps.
CONNECTION_STRING = os.environ["AzureWebJobsStorage"]
CONTAINER_NAME = os.environ.get("INDEX_CONTAINER_NAME", "llamaindex-store")
INDEX_PATH_PREFIX = os.environ.get("INDEX_PATH_PREFIX", "prod_index_v1")
LOCAL_INDEX_DIR = "/tmp/retrieval_index" # Azure Functions provide a temporary filesystem at /tmp

def main(req: func.HttpRequest) -> func.HttpResponse:
    """
    Main function entry point for the Azure Function.
    Handles loading the index (if necessary) and executing the RAG query.
    """
    global llm_index, index_version

    start_time = time.time()
    
    # Check if the currently loaded index matches the one specified by environment configuration.
    # This allows for zero-downtime index updates by changing the env var and letting instances recycle.
    if llm_index is None or index_version != INDEX_PATH_PREFIX:
        logging.info(f"Index not loaded or version mismatch. Current: '{index_version}', Required: '{INDEX_PATH_PREFIX}'. Loading index...")
        try:
            load_start_time = time.time()
            blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING)
            
            # This function handles the download from Azure Blob Storage.
            download_index_from_blob(blob_service_client, CONTAINER_NAME, INDEX_PATH_PREFIX, LOCAL_INDEX_DIR)
            
            storage_context = StorageContext.from_defaults(persist_dir=LOCAL_INDEX_DIR)
            llm_index = load_index_from_storage(storage_context)
            index_version = INDEX_PATH_PREFIX # Update the global version tracker.
            
            load_duration = time.time() - load_start_time
            logging.info(f"Successfully loaded index '{INDEX_PATH_PREFIX}' in {load_duration:.2f} seconds.")

        except Exception as e:
            # A failure to load the index is a fatal error for this function.
            logging.error(f"FATAL: Failed to load index from blob storage path '{INDEX_PATH_PREFIX}'. Error: {e}", exc_info=True)
            return func.HttpResponse(
                json.dumps({"error": "Could not initialize query engine. Index loading failed."}),
                status_code=500,
                mimetype="application/json"
            )

    try:
        req_body = req.get_json()
        query_text = req_body.get('query')
        if not query_text:
            return func.HttpResponse(
                json.dumps({"error": "Missing 'query' in request body."}),
                status_code=400,
                mimetype="application/json"
            )

        query_engine = llm_index.as_query_engine()
        
        query_start_time = time.time()
        response = query_engine.query(query_text)
        query_duration = time.time() - query_start_time
        
        total_duration = time.time() - start_time

        # --- Structured Logging ---
        # Instead of plain text, we log a JSON object. This is essential for Fluentd to parse
        # and enrich logs without complex regex. We capture key performance indicators.
        log_payload = {
            "level": "INFO",
            "message": "Query successful",
            "query": query_text,
            "total_duration_ms": int(total_duration * 1000),
            "query_engine_duration_ms": int(query_duration * 1000),
            "index_version": index_version,
            "source_nodes": len(response.source_nodes) if response.source_nodes else 0
        }
        logging.info(json.dumps(log_payload))

        api_response = {
            "response": str(response),
            "metadata": {
                "source_nodes": [{"id": node.node_id, "score": float(node.score)} for node in response.source_nodes]
            }
        }

        return func.HttpResponse(
            json.dumps(api_response),
            mimetype="application/json",
            status_code=200
        )

    except json.JSONDecodeError:
        return func.HttpResponse(
            json.dumps({"error": "Invalid JSON in request body."}),
            status_code=400,
            mimetype="application/json"
        )
    except Exception as e:
        # Catch-all for unexpected errors during the query phase.
        logging.error(f"Error processing query. Error: {e}", exc_info=True)
        return func.HttpResponse(
            json.dumps({"error": "An internal error occurred."}),
            status_code=500,
            mimetype="application/json"
        )

def download_index_from_blob(client: BlobServiceClient, container: str, prefix: str, path: str):
    """
    Downloads all blobs with a given prefix from a container to a local directory.
    This is necessary because LlamaIndex saves its state across multiple files.
    """
    if os.path.exists(path):
        import shutil
        shutil.rmtree(path) # Clean up previous index files.
    os.makedirs(path, exist_ok=True)
    
    container_client = client.get_container_client(container)
    blob_list = container_client.list_blobs(name_starts_with=prefix)
    
    download_count = 0
    for blob in blob_list:
        blob_client = container_client.get_blob_client(blob.name)
        # The local file path should be relative to the download directory.
        relative_path = blob.name.replace(prefix, "").lstrip("/")
        download_file_path = os.path.join(path, relative_path)
        
        os.makedirs(os.path.dirname(download_file_path), exist_ok=True)
        
        with open(download_file_path, "wb") as download_file:
            download_file.write(blob_client.download_blob().readall())
        download_count += 1
    
    if download_count == 0:
        raise FileNotFoundError(f"No index files found in blob container '{container}' with prefix '{prefix}'")
    
    logging.info(f"Downloaded {download_count} index files from prefix '{prefix}'.")

Automating the deployment of this function and, more importantly, the lifecycle of the LlamaIndex vector store, fell to CircleCI. A simple git push should not just deploy code; it must orchestrate the entire process of index creation and promotion through environments. A pitfall here is tightly coupling the index build to the application deploy. In a real-world project, the knowledge base (the source documents) changes independently of the application code.

Our CircleCI workflow reflects this separation. It has distinct jobs for building the index from source documents, deploying the Azure Function, and a manual approval gate to promote a new index to production. This provides a crucial safety mechanism.

# .circleci/config.yml
version: 2.1

orbs:
  azure-cli: circleci/azure-[email protected]
  python: circleci/[email protected]

jobs:
  build_and_upload_index:
    docker:
      - image: cimg/python:3.11
    steps:
      - checkout
      - python/install-packages:
          pkg-manager: pip
          pip-dependency-file: functions/requirements.txt # Assuming a shared requirements
      - run:
          name: "Set up Azure credentials for index build"
          command: |
            echo "export AZURE_STORAGE_CONNECTION_STRING='${AZURE_STORAGE_CONNECTION_STRING}'" >> $BASH_ENV
            echo "export OPENAI_API_KEY='${OPENAI_API_KEY}'" >> $BASH_ENV
      - run:
          name: "Build and Upload New Index"
          # This script ingests documents, builds the LlamaIndex store, and uploads it
          # to a versioned prefix in Azure Blob Storage.
          # The version is based on the pipeline ID to ensure uniqueness.
          command: |
            INDEX_VERSION="staging_index_${CIRCLE_PIPELINE_ID}"
            echo "Building index version: ${INDEX_VERSION}"
            python ./scripts/build_index.py --output-version ${INDEX_VERSION}
            echo "export INDEX_VERSION=${INDEX_VERSION}" >> $BASH_ENV
  
  deploy_staging_function:
    docker:
      - image: mcr.microsoft.com/azure-functions/python:4-python3.11
    steps:
      - checkout
      - azure-cli/install
      - run:
          name: "Login to Azure"
          command: |
            az login --service-principal -u $AZURE_APP_ID -p $AZURE_PASSWORD --tenant $AZURE_TENANT
      - run:
          name: "Deploy Azure Function to Staging Slot"
          command: |
            # Temporarily set the app setting to point to the new staging index
            az functionapp config appsettings set --name ${AZURE_FUNCTION_APP_NAME} --resource-group ${AZURE_RESOURCE_GROUP} \
              --slot staging \
              --settings "INDEX_PATH_PREFIX=${INDEX_VERSION}"

            # Deploy the function code
            cd QueryFunction
            func azure functionapp publish ${AZURE_FUNCTION_APP_NAME} --slot staging --force
  
  promote_to_production:
    docker:
      - image: cimg/base:stable
    steps:
      - azure-cli/install
      - run:
          name: "Login to Azure"
          command: |
            az login --service-principal -u $AZURE_APP_ID -p $AZURE_PASSWORD --tenant $AZURE_TENANT
      - run:
          name: "Promote Index by renaming blob prefix"
          # This is an atomic operation in most cloud storage, making it a safe promotion strategy.
          # First, we need to find the blobs from the staging version.
          # Then, we copy them to the 'prod_index_v1' prefix and delete the old ones.
          # Note: A more robust script would handle pagination and errors.
          command: |
            SOURCE_PREFIX="staging_index_${CIRCLE_PIPELINE_ID}"
            DEST_PREFIX="prod_index_v1"
            CONTAINER_NAME="llamaindex-store"

            echo "Promoting index from ${SOURCE_PREFIX} to ${DEST_PREFIX}"
            
            # Delete old production index
            OLD_BLOBS=$(az storage blob list --container-name ${CONTAINER_NAME} --prefix ${DEST_PREFIX} --connection-string "${AZURE_STORAGE_CONNECTION_STRING}" --query "[].name" -o tsv)
            if [[ -n "$OLD_BLOBS" ]]; then
                for blob in $OLD_BLOBS; do
                    az storage blob delete --container-name ${CONTAINER_NAME} --name "$blob" --connection-string "${AZURE_STORAGE_CONNECTION_STRING}"
                done
            fi

            # Copy new index to production
            SOURCE_BLOBS=$(az storage blob list --container-name ${CONTAINER_NAME} --prefix ${SOURCE_PREFIX} --connection-string "${AZURE_STORAGE_CONNECTION_STRING}" --query "[].name" -o tsv)
            for blob in $SOURCE_BLOBS; do
                dest_blob_name=$(echo "$blob" | sed "s/^${SOURCE_PREFIX}/${DEST_PREFIX}/")
                az storage blob copy start --container-name ${CONTAINER_NAME} --destination-blob "$dest_blob_name" --source-blob "$blob" --connection-string "${AZURE_STORAGE_CONNECTION_STRING}"
            done

            echo "Promotion complete. Production function will pick up new index on next cold start."


workflows:
  build-and-deploy:
    jobs:
      - build_and_upload_index
      - deploy_staging_function:
          requires:
            - build_and_upload_index
      - hold_for_production_approval:
          type: approval
          requires:
            - deploy_staging_function
      - promote_to_production:
          requires:
            - hold_for_production_approval

With the core service and its deployment automated, we turned to the operational aspects. The Azure Function was exposed directly, but we needed a more robust entry point to handle caching, rate limiting, and to serve as a clean abstraction layer. This is a classic use case for a reverse proxy. While Azure API Management is the platform-native solution, its cost and complexity were prohibitive for this service. A self-hosted Nginx instance provided the required control at a fraction of the cost.

A key optimization was implementing proxy_cache. Many user queries are repetitive. Caching the responses at the Nginx layer for even a few minutes dramatically reduces calls to the Azure Function, which in turn reduces LLM token consumption and overall cost. The proxy_cache_key is critical; we keyed it on the request body to ensure different queries resulted in different cache entries.

For observability, Nginx’s default log format is insufficient. We defined a custom JSON log format to capture critical fields like response time, cache status ($upstream_cache_status), and the request body itself. This structured data is trivial for a log aggregator to parse.

# /etc/nginx/nginx.conf

# Define a custom log format that outputs JSON.
# This makes parsing in Fluentd deterministic and efficient.
log_format json_combined escape=json
  '{'
    '"time_local":"$time_local",'
    '"remote_addr":"$remote_addr",'
    '"request_method":"$request_method",'
    '"request_uri":"$request_uri",'
    '"status":$status,'
    '"body_bytes_sent":$body_bytes_sent,'
    '"http_referer":"$http_referer",'
    '"http_user_agent":"$http_user_agent",'
    '"request_time":$request_time,'
    '"upstream_addr":"$upstream_addr",'
    '"upstream_response_time":$upstream_response_time,'
    '"upstream_cache_status":"$upstream_cache_status",'
    '"request_body": "$request_body"'
  '}';

# Define the cache path. keys_zone sets up a shared memory zone to store keys.
# inactive=10m means items will be evicted if not accessed for 10 minutes.
proxy_cache_path /var/cache/nginx levels=1:2 keys_zone=rag_cache:10m max_size=1g inactive=10m use_temp_path=off;

server {
    listen 80;
    server_name your.api.domain;

    access_log /var/log/nginx/access.log json_combined;
    error_log /var/log/nginx/error.log;

    location /api/query {
        # --- Caching Configuration ---
        proxy_cache rag_cache;
        # Only cache successful GET/POST requests.
        proxy_cache_methods POST;
        proxy_cache_valid 200 302 5m;
        proxy_cache_valid 404 1m;
        # Use the request body as part of the cache key. This is essential for a query API.
        proxy_cache_key "$scheme$request_method$host$request_uri$request_body";
        # Add a header to indicate if the response was a cache HIT or MISS. Useful for debugging.
        add_header X-Cache-Status $upstream_cache_status;

        # --- Request Buffering and Timeouts ---
        # Allow larger request bodies containing long queries.
        client_max_body_size 1M;
        proxy_read_timeout 180s; # RAG queries can be slow.
        proxy_connect_timeout 90s;

        # --- Proxy Pass to Azure Function ---
        # The function URL is passed as an environment variable to the Nginx container.
        # This uses Nginx's internal DNS resolver for service discovery.
        resolver 127.0.0.11 valid=30s; # Docker's embedded DNS
        set $azure_function_host "your-function-app-name.azurewebsites.net";
        proxy_pass https://$azure_function_host/api/QueryFunction; # The route configured in function.json
        
        # We must rewrite host headers for Azure Functions to route correctly.
        proxy_set_header Host $azure_function_host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

This gave us two distinct sources of structured logs: the Nginx access logs and the Azure Function’s application logs. The final piece of the puzzle was to unify them. This is where Fluentd came in. We deployed a Fluentd agent responsible for collecting logs from both sources, parsing them, and forwarding them to a centralized backend.

The configuration for Fluentd required two main source blocks. One used the in_tail plugin to follow the Nginx JSON log file. The other was more complex. Azure Functions natively integrate with Azure Monitor and can be configured to stream logs to an Azure Event Hub. We used Fluentd’s Event Hub input plugin to consume this stream. This provided a durable, scalable mechanism to get logs out of the Azure ecosystem and into our aggregation layer.

The real power comes from the filter and match directives. We could parse the JSON from both sources, add metadata (like source: nginx or source: azure_function), and then route them. For instance, we could route all logs to a development Elasticsearch cluster while simultaneously routing only logs with level: "ERROR" to a PagerDuty alerting service.

# /etc/fluent/fluent.conf

# --- SOURCE 1: Nginx Access Logs ---
# Tailing a log file is a common pattern.
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /fluentd/log/nginx.access.pos
  tag nginx.access
  <parse>
    @type json # Because we configured Nginx to log in JSON, no regex is needed.
  </parse>
</source>

# --- SOURCE 2: Azure Function Logs (via Event Hub) ---
# This is a more advanced setup for cloud environments.
# Requires the fluent-plugin-azure-event-hubs gem.
<source>
  @type azure_event_hubs
  tag azure.function
  connection_string "#{ENV['AZURE_EVENT_HUB_CONNECTION_STRING']}"
  event_hub_name "#{ENV['AZURE_EVENT_HUB_NAME']}"
  consumer_group "$Default"
  storage_connection_string "#{ENV['AZURE_STORAGE_CONNECTION_STRING_FOR_CHECKPOINT']}"
  storage_container_name "fluentd-checkpoints"
  <parse>
    @type json # Our function logs structured JSON, so parsing is simple.
  </parse>
</source>

# --- FILTER: Enrich logs with static metadata ---
# This helps distinguish log sources in the backend.
<filter **>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
  </record>
</filter>

# --- OUTPUT: Forward to a backend (e.g., Elasticsearch) ---
# For demonstration, we use stdout, but a production setup would use a persistent backend.
<match **>
  @type copy
  <store>
    @type stdout
  </store>
  # <store>
  #   @type elasticsearch
  #   host elasticsearch.host
  #   port 9200
  #   logstash_format true
  #   logstash_prefix my-rag-app
  #   <buffer>
  #     @type file
  #     path /fluentd/buffer/es
  #     flush_interval 10s
  #   </buffer>
  # </store>
</match>

The entire architecture can be visualized as two distinct flows: the request flow and the logging flow.

graph TD
    subgraph Request Flow
        U[User] -->|HTTPS Request| N(Nginx);
        N -->|Cache MISS| AF[Azure Function];
        N -->|Cache HIT| U;
        AF -->|Download Index| BS[Azure Blob Storage];
        AF -->|Query| LLM[LLM Service];
        AF -->|Response| N;
    end

    subgraph Logging Flow
        N -- JSON Log --> FL[Fluentd Agent];
        AF -- Structured Log --> EH[Azure Event Hub];
        EH -- Stream --> FL;
        FL -- Unified Stream --> BE[Log Backend e.g., Elasticsearch];
    end

This architecture, while composed of several moving parts, solves the initial problems systematically. Scaling is handled by Azure Functions, state is managed and versioned in Blob Storage, zero-downtime updates are orchestrated by CircleCI, and the entire system’s behavior is made transparent through the unified logging pipeline managed by Fluentd.

The primary limitation of this design remains the cold start latency associated with loading the index. While the Premium plan for Azure Functions mitigates this by providing pre-warmed instances, a very large index can still take several seconds to load, impacting the first user’s experience. A future iteration could explore using Azure Container Apps with custom container images, allowing us to mount a pre-populated file share (like Azure Files) directly, potentially speeding up the index loading process. Furthermore, the Nginx caching is naive; a more advanced semantic cache could be implemented within the function itself to cache responses for queries that are semantically similar but not textually identical, further reducing costs and latency.


  TOC