Implementing Correlated Logging Across a Hybrid MLOps Stack with Nuxt.js Azure Functions and Google PubSub


The initial architecture for our MLOps control plane seemed straightforward. A Nuxt.js front-end provided the user interface for data scientists to trigger model training jobs. This UI called a serverless backend on Azure Functions, which acted as a lightweight API gateway. To handle long-running training processes without blocking the API, the Azure Function would publish a job message to a Google Cloud Pub/Sub topic. A fleet of GPU-enabled workers would then subscribe to this topic, execute the job, and log their output. The problem became apparent not during development, but three weeks into production.

A data scientist reported a training job, initiated from the Nuxt UI, had failed silently. There was no error on the front-end. The Azure Function logs showed a successful “202 Accepted” response. The Pub/Sub topic acknowledged message delivery. Somewhere in the distributed fleet of ML workers, the job had vanished or errored out without a trace. We spent six hours manually cross-referencing timestamps between Azure Monitor logs, GCP logs, and the raw stdout logs from the worker pods. The lack of a single, unified request identifier—a trace_id—made debugging a nightmare. Our observability was siloed and ineffective. The pain point was clear: we needed a unified logging pipeline capable of correlating a single user action across a multi-cloud, hybrid-technology stack.

The goal was to centralize all relevant logs into our existing ELK (Elasticsearch, Logstash, Kibana) stack and, most importantly, to be able to filter for a single transaction across all systems. The core concept was manual trace context propagation. A unique trace_id would be generated on the client-side (in the Nuxt.js application) for every user-initiated action. This ID would then be passed through every component in the chain: as an HTTP header to the Azure Function, as a message attribute in Google Cloud Pub/Sub, and finally consumed and used by the ML worker’s logger.

sequenceDiagram
    participant User
    participant Nuxt.js UI
    participant Azure Function API
    participant Google Cloud Pub/Sub
    participant ML Worker
    participant ELK Stack

    User->>+Nuxt.js UI: Clicks "Run Training Job"
    Note over Nuxt.js UI: Generates `trace_id: xyz-123`
    Nuxt.js UI->>+Azure Function API: POST /jobs (Header: X-Trace-ID: xyz-123)
    Azure Function API->>Nuxt.js UI: 202 Accepted
    Note over Azure Function API: Extracts `trace_id`, logs "Job received"
    Azure Function API->>+Google Cloud Pub/Sub: Publish message (Attribute: trace_id: xyz-123)
    Note over Google Cloud Pub/Sub: Message enqueued
    Google Cloud Pub/Sub-->>-ML Worker: Delivers message
    Note over ML Worker: Extracts `trace_id`, starts processing
    ML Worker-->>ELK Stack: Logs "Processing job xyz-123"
    ML Worker-->>ELK Stack: Logs "Job xyz-123 failed: OOM Error"
    Azure Function API-->>ELK Stack: Forwards Azure Monitor logs
    Nuxt.js UI-->>ELK Stack: (Optional) Forwards client-side logs

This manual approach was chosen over a full-blown OpenTelemetry implementation for pragmatic reasons. We needed a solution within days, not weeks. Instrumenting every component with OpenTelemetry SDKs, configuring collectors, and setting up an APM backend like Jaeger was a larger project. A correlated logging solution using a simple trace_id would solve 80% of our immediate debugging pain.

Part 1: Client-Side Trace ID Generation in Nuxt.js

The chain of custody for the trace_id begins in the browser. A common pitfall is to let the first backend service generate the ID, but this leaves client-side interactions and potential network errors before the first hop completely untracked. We needed the ID generated the moment the user action occurred.

We implemented this using a Nuxt 3 plugin that injects an Axios interceptor. This ensures that every outgoing API request from our application automatically gets a new trace_id or propagates an existing one if it’s part of a larger client-side workflow.

First, the plugin definition in plugins/axios.ts:

// plugins/axios.ts
import axios from 'axios';
import { v4 as uuidv4 } from 'uuid';

export default defineNuxtPlugin((nuxtApp) => {
  const api = axios.create({
    baseURL: nuxtApp.$config.public.apiBaseUrl,
    headers: {
      'Content-Type': 'application/json',
    },
  });

  // The core of our client-side tracing.
  // This interceptor adds a unique trace ID to every outgoing request.
  api.interceptors.request.use(
    (config) => {
      // In a real-world project, you might want to propagate an existing trace ID
      // if the request is part of a larger client-side transaction.
      // For simplicity here, we generate a new one for each top-level API call.
      const traceId = uuidv4();
      
      console.log(`[Tracing] Generated Trace ID: ${traceId} for URL: ${config.url}`);

      if (!config.headers) {
        config.headers = {};
      }
      // The header name is critical. It must be consistent across the entire stack.
      // We chose 'X-Trace-ID' as a conventional, non-standard header.
      config.headers['X-Trace-ID'] = traceId;

      return config;
    },
    (error) => {
      // Also log the error with context before it gets sent.
      console.error('[Axios Request Error]', error);
      return Promise.reject(error);
    }
  );

  // We also want to log response errors for better client-side observability.
  api.interceptors.response.use(
    (response) => response,
    (error) => {
      const traceId = error.config.headers['X-Trace-ID'];
      console.error(`[Axios Response Error] Trace ID: ${traceId}`, error.response?.data || error.message);
      return Promise.reject(error);
    }
  );

  return {
    provide: {
      api,
    },
  };
});

This code uses uuid to generate a unique identifier and attaches it to the X-Trace-ID header. Any component in our Nuxt app that uses the provided $api service now automatically participates in our tracing scheme.

Part 2: Propagating Context in Azure Functions

The next link in the chain is the Azure Function written in Python. Its responsibility is twofold:

  1. Extract the X-Trace-ID from the incoming HTTP request.
  2. Inject this trace_id into both its own logs and as an attribute on the message it publishes to Google Cloud Pub/Sub.

A common mistake here is to simply pass the trace_id as a parameter to every function. This clutters business logic. A better, more maintainable approach is to use a logging context or filter. We used Python’s logging.LoggerAdapter to inject the trace_id into every log record automatically.

Here is the Azure Function code, including environment variable handling for configurations and robust error handling.

function_app.py:

import azure.functions as func
import logging
import os
import json
from uuid import uuid4
from google.cloud import pubsub_v1
from google.api_core import exceptions

# --- Configuration ---
# In a real-world project, these come from Application Settings.
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
GCP_PUBSUB_TOPIC_ID = os.environ.get("GCP_PUBSUB_TOPIC_ID")
# Service account JSON needs to be available. For Functions, this can be managed securely.
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/to/your/credentials.json"

# --- Custom Logging Adapter for Trace Context ---
class TraceContextAdapter(logging.LoggerAdapter):
    """
    This adapter injects trace_id into the log record's extras dictionary.
    This is the key to structured, correlated logging.
    """
    def process(self, msg, kwargs):
        # If a trace_id is present in the adapter's extra context,
        # ensure it's included in the log record.
        if 'trace_id' in self.extra:
            kwargs['extra'] = self.extra
        return msg, kwargs

# --- Initialize Azure Function App and Pub/Sub Client ---
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(GCP_PROJECT_ID, GCP_PUBSUB_TOPIC_ID)

# Get the root logger and configure it for structured JSON output
# This makes parsing in Logstash much more reliable.
logger = logging.getLogger(__name__)
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "message": "%(message)s", "trace_id": "%(trace_id)s"}'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
logger.setLevel(logging.INFO)


@app.route(route="jobs", methods=["POST"])
def create_training_job(req: func.HttpRequest) -> func.HttpResponse:
    # 1. Extract Trace ID from headers. If not present, generate a new one.
    # This ensures every request path is traceable, even from clients that don't send the header.
    trace_id = req.headers.get("X-Trace-ID", str(uuid4()))

    # Create a logger adapter with the current request's trace_id
    log_adapter = TraceContextAdapter(logger, {'trace_id': trace_id})

    log_adapter.info(f"Received request to create training job.")

    try:
        req_body = req.get_json()
        model_name = req_body.get("model_name")
        dataset_id = req_body.get("dataset_id")

        if not all([model_name, dataset_id]):
            log_adapter.warning("Invalid request body. Missing model_name or dataset_id.")
            return func.HttpResponse(
                json.dumps({"error": "Missing required fields: model_name, dataset_id"}),
                status_code=400,
                mimetype="application/json"
            )

    except ValueError:
        log_adapter.error("Failed to decode JSON body.")
        return func.HttpResponse(
             json.dumps({"error": "Invalid JSON format"}),
             status_code=400,
             mimetype="application/json"
        )

    # 2. Prepare and publish the message to Google Cloud Pub/Sub
    message_data = json.dumps({
        "model_name": model_name,
        "dataset_id": dataset_id,
        "request_time": req.headers.get('Date', '') # Pass along other relevant info
    }).encode("utf-8")

    try:
        log_adapter.info(f"Publishing job for model '{model_name}' to Pub/Sub topic '{GCP_PUBSUB_TOPIC_ID}'.")
        
        # This is the critical step for cross-service tracing.
        # The trace_id is embedded as a message attribute.
        future = publisher.publish(
            topic_path, 
            data=message_data, 
            trace_id=trace_id # Pub/Sub attributes must be strings
        )
        
        # Block for acknowledgement to ensure it was sent successfully
        message_id = future.result(timeout=10)
        log_adapter.info(f"Successfully published message with ID: {message_id}.")

        return func.HttpResponse(
            json.dumps({"status": "Job accepted", "trace_id": trace_id, "message_id": message_id}),
            status_code=202,
            mimetype="application/json"
        )

    except exceptions.GoogleAPICallError as e:
        log_adapter.critical(f"Pub/Sub publish failed: {e}", exc_info=True)
        return func.HttpResponse(
            json.dumps({"error": "Internal server error: Failed to queue job."}),
            status_code=500,
            mimetype="application/json"
        )
    except TimeoutError:
        log_adapter.error("Publishing to Pub/Sub timed out.")
        return func.HttpResponse(
            json.dumps({"error": "Internal server error: Job queueing timed out."}),
            status_code=504, # Gateway Timeout
            mimetype="application/json"
        )

The custom JSON formatter is a crucial detail. Relying on plain text logs makes parsing fragile. By forcing the output to be JSON, we create a contract with our Logstash pipeline, making ingestion reliable.

Part 3: Consuming Trace Context in the ML Worker

The ML worker, a Python process running in a container, subscribes to the Pub/Sub topic. Its task is to receive the message, extract the job parameters and the trace_id from the attributes, and then perform the work, logging its progress using that same trace_id.

worker.py:

import os
import json
import time
import logging
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# --- Configuration ---
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
GCP_PUBSUB_SUBSCRIPTION_ID = os.environ.get("GCP_PUBSUB_SUBSCRIPTION_ID")
WORKER_ID = os.environ.get("HOSTNAME", "local-worker") # Get worker ID from environment

# --- Structured Logging Setup ---
# Similar to the Azure function, we enforce structured JSON logging.
class TraceContextAdapter(logging.LoggerAdapter):
    def process(self, msg, kwargs):
        if 'trace_id' in self.extra:
            # We also add worker_id for better context in a distributed environment
            self.extra['worker_id'] = WORKER_ID
            kwargs['extra'] = self.extra
        return msg, kwargs

# Configure root logger
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
# Notice the slightly different format, which Logstash will need to handle.
formatter = logging.Formatter(
    '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "worker_id": "%(worker_id)s", "trace_id": "%(trace_id)s", "message": "%(message)s"}'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# --- Pub/Sub Subscriber Logic ---
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GCP_PROJECT_ID, GCP_PUBSUB_SUBSCRIPTION_ID)

def process_message(message: pubsub_v1.subscriber.message.Message) -> None:
    """Callback function to handle incoming messages."""
    
    # 1. Extract trace_id from message attributes. This is the critical link.
    trace_id = message.attributes.get("trace_id", "NO_TRACE_ID")
    log_adapter = TraceContextAdapter(logger, {'trace_id': trace_id})

    try:
        log_adapter.info(f"Received message with ID: {message.message_id}")
        data = json.loads(message.data.decode("utf-8"))
        model_name = data.get("model_name")
        
        log_adapter.info(f"Starting processing for model: {model_name}")

        # Simulate a long-running ML task
        time.sleep(15)

        # Simulate a potential failure
        if model_name == "fail-me":
            raise ValueError("Simulated model training failure: Invalid dataset.")

        log_adapter.info(f"Successfully processed job for model: {model_name}")
        message.ack() # Acknowledge the message so it's not redelivered

    except json.JSONDecodeError:
        log_adapter.error("Failed to decode message data. Discarding message.")
        message.nack() # Negative acknowledgement might send it to a dead-letter queue
    except Exception as e:
        log_adapter.error(f"An unexpected error occurred during processing: {e}", exc_info=True)
        message.nack()

def main():
    log_adapter = TraceContextAdapter(logger, {'trace_id': 'worker-startup'})
    log_adapter.info(f"Worker '{WORKER_ID}' starting up. Listening for messages on '{subscription_path}'...")
    
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=process_message)

    try:
        # The worker will block here indefinitely, waiting for messages.
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
    except Exception as e:
        log_adapter.critical(f"Subscriber encountered a fatal error: {e}", exc_info=True)
        streaming_pull_future.cancel()
        
    log_adapter.info("Worker shutting down.")

if __name__ == "__main__":
    main()

The worker now logs with the same trace_id it received from the Azure Function, completing the context propagation.

Part 4: The ELK Ingestion Pipeline

With all applications emitting structured JSON logs containing a trace_id, the final piece is configuring the ELK stack to ingest and parse them correctly. This involves Filebeat to ship logs and Logstash to process them before storing them in Elasticsearch.

First, a sample filebeat.yml configuration on the ML worker nodes. This assumes logs are being written to stdout/stderr of the container, which is a standard practice.

filebeat.yml:

filebeat.inputs:
- type: container
  paths:
    - '/var/lib/docker/containers/*/*.log'
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: log

processors:
  - add_docker_metadata: ~

# Send logs to our Logstash instance for processing
output.logstash:
  hosts: ["logstash.internal.corp:5044"]

The real intelligence lies in the Logstash pipeline configuration. It must be able to handle logs from different sources (e.g., Azure Functions forwarded via an Event Hub, and Filebeat from our workers), parse the JSON, and normalize the fields.

logstash.conf:

```groovy

logstash.conf

input {

Input for logs shipped via Filebeat from our ML workers

beats {
port => 5044
}

A separate input for Azure Functions logs.

In a production setup, this would be an azure_event_hubs input

configured to pull from the diagnostic settings export.

For this example, we’ll simulate it with a tcp input.

tcp {
port => 5000
codec => json_lines
type => “azure_function_log”
}
}

filter {

The core logic resides here. We need to parse the JSON from the ‘message’ field.

The ‘if’ conditions handle logs from different sources.

if [type] != “azure_function_log” {
# This block handles logs from our Python ML workers via Filebeat
json {
source => “message”
target => “parsed_json” # Place the parsed data into a temporary field
}

# Promote the nested fields to the top level
mutate {
  rename => { "[parsed_json][timestamp]" => "@timestamp" }
  rename => { "[parsed_json][level]" => "log.level" }
  rename => { "[parsed_json][message]" => "message" }
  rename => { "[parsed_json][trace_id]" => "trace.id" }
  rename => { "[parsed_json][worker_id]" => "service.name" }
  remove_field => ["parsed_json", "log"]
}

} else {
# This block handles the logs from our Azure Function.
# The structure might be different, so we handle it separately.
# We assume the custom JSON formatter in the function code produces a similar structure.
json {
source => “message”
target => “parsed_json”
}

mutate {
  rename => { "[parsed_json][timestamp]" => "@timestamp" }
  rename => { "[parsed_json][level]" => "log.level" }
  rename => { "[parsed_json][message]" => "message" }
  # The key to correlation: normalize the trace ID field name.
  rename => { "[parsed_json][trace_id]" => "trace.id" }
  add_field => { "service.name" => "azure-function-api" }
  remove_field => ["parsed_json"]
}

}

Convert the timestamp to a proper date object for Elasticsearch

date {
match => [ “@timestamp”, “ISO8601”, “yyyy-MM-dd HH:mm:ss,SSS” ]
target => “@timestamp”
}

If a trace_id is missing after all parsing, mark it.

if


  TOC