Implementing Correlated Trace-Log-Metric Observability for Serverless ML Inference with SkyWalking Vector and AWS Lambda


The p99 latency alerts for our sentiment analysis API wouldn’t stop. Hosted on AWS Lambda and powered by a Hugging Face model, the service was a black box. Traces from AWS X-Ray showed that some invocations took upwards of five seconds, while most finished in under 400ms. The problem was that the trace ended where the function began; we had no visibility into the model’s execution. CloudWatch Logs were a messy stream of unstructured text, useless for correlating with a specific slow request. We were flying blind, unable to distinguish between a cold start, a problematic payload causing inference loops, or infrastructure throttling. The core pain point was a complete lack of correlation between a high-level trace and the low-level application behavior.

Our initial concept was to create a unified observability stream. For any given request, we needed to see its distributed trace, all associated application logs, and relevant performance metrics, all linked by a single trace_id. This would allow us to select a slow trace in our APM and immediately drill down into the exact, structured logs generated by the model during that specific invocation.

This led to a specific set of technology selection decisions. We chose SkyWalking as our observability backend due to its native support for correlating traces, logs, and metrics and its robust implementation of the OpenTelemetry Protocol (OTLP). AWS Lambda remained the execution environment for its scalability. The real architectural crux was how to get telemetry out of this ephemeral environment efficiently. Shipping data directly from the Python function handler introduces latency and couples the business logic with observability concerns. A common mistake here is to treat telemetry shipping as a synchronous part of the request-response cycle, which is a direct hit to performance and reliability.

This is where Vector entered the picture, deployed as a Lambda Extension. Vector is a high-performance observability data pipeline. Running it as an extension allows it to operate within the same execution environment as the function but in a separate process. It can subscribe to the Lambda Telemetry API and receive logs directly from the runtime, completely out-of-band from the function’s execution. This decouples data collection from our Python code. Furthermore, Vector’s powerful transformation capabilities using the Vector Remap Language (VRL) would allow us to parse, enrich, and route the data to SkyWalking without altering the core application logic. This architecture promised a clean separation of concerns and minimal performance overhead on our inference code.

graph TD
    subgraph Client
        A[API Gateway Request]
    end

    subgraph AWS Lambda Execution Environment
        A -- Invokes --> F[Python Function Handler]
        subgraph "Lambda Runtime"
            F -- "Writes logs to stdout/stderr" --> TAPI[Lambda Telemetry API]
        end
        subgraph "Vector Extension"
            V[Vector Process]
            TAPI -- "Streams logs" --> V
        end
        F -- "Generates Spans (OTLP)" --> OAP[SkyWalking OAP]
        V -- "Transforms & Forwards Logs (OTLP)" --> OAP
        V -- "Extracts & Forwards Metrics (OTLP)" --> OAP
    end

    subgraph Observability Backend
        OAP[SkyWalking OAP Server]
        UI[SkyWalking UI]
        OAP --> UI
    end

    style F fill:#D5E8D4,stroke:#82B366,stroke-width:2px
    style V fill:#DAE8FC,stroke:#6C8EBF,stroke-width:2px

The first step was establishing a local SkyWalking instance capable of receiving OTLP data. For a real-world project, this would be a clustered deployment, but for development, Docker Compose is sufficient. The key is exposing the OTLP gRPC ports for logs, metrics, and traces.

docker-compose.yml for SkyWalking:

version: '3.8'
services:
  elasticsearch:
    image: elasticsearch:7.17.10
    container_name: elasticsearch
    ports:
      - "9200:9200"
    healthcheck:
      test: ["CMD-SHELL", "curl --silent --fail localhost:9200/_cluster/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1

  oap:
    image: apache/skywalking-oap-server:9.5.0
    container_name: oap
    depends_on:
      elasticsearch:
        condition: service_healthy
    ports:
      - "11800:11800" # OTLP gRPC receiver port
      - "12800:12800" # SkyWalking native agent port
    environment:
      - SW_STORAGE=elasticsearch
      - SW_STORAGE_ES_CLUSTER_NODES=elasticsearch:9200
      - SW_OTLP_RECEIVER=default # Enable OTLP receiver
      - SW_OTLP_RECEIVER_ENABLED_OTLP_METRICS_RULES=oal/otlp-metrics.oal
      - SW_CORE_TELEMETRY=prometheus
      - JAVA_OPTS=-Xms1024m -Xmx1024m
    healthcheck:
      test: ["CMD", "/bin/bash", "-c", "curl http://localhost:12800/graphql -H 'Content-Type: application/json' --data '{\"query\":\"{health}\"}'"]
      interval: 30s
      timeout: 10s
      retries: 3

  ui:
    image: apache/skywalking-ui:9.5.0
    container_name: ui
    depends_on:
      oap:
        condition: service_healthy
    ports:
      - "8080:8080"
    environment:
      - SW_OAP_ADDRESS=http://oap:12800

With the backend ready, the focus shifted to the Lambda function. The core task was to instrument the Python code to do two things: generate traces via the SkyWalking Python agent and emit structured JSON logs containing the trace_id from the active trace context. This trace_id is the lynchpin for the entire correlation strategy.

lambda/src/requirements.txt:

skywalking-python==1.0.1
transformers==4.30.2
torch==2.0.1
python-json-logger==2.0.7
boto3==1.28.57

lambda/src/handler.py:

import os
import logging
from uuid import uuid4
from pythonjsonlogger import jsonlogger

# SkyWalking agent must be initialized before any other imports
# that might be instrumented (like requests, boto3).
from skywalking import agent, config
from skywalking.trace.context import SpanContext, get_context
from skywalking.trace.tags import Tag

from transformers import pipeline

# --- Global Initialization ---
# This part runs during the Lambda cold start.
# A common mistake is to initialize this inside the handler,
# leading to massive performance degradation on every invocation.

SERVICE_NAME = os.environ.get("SERVICE_NAME", "Serverless-ML-Inference")
OAP_SERVER_ADDR = os.environ.get("OAP_SERVER_ADDR", "127.0.0.1:11800")
INSTANCE_ID = str(uuid4())

try:
    config.init(
        service_name=SERVICE_NAME,
        service_instance=f"{SERVICE_NAME}@{INSTANCE_ID}",
        collector_address=OAP_SERVER_ADDR,
        protocol='grpc',
        logging_level='INFO',
        otlp_json_log_path=None # Disable file logging in Lambda
    )
    agent.start()
    print("SkyWalking agent started successfully.")
except Exception as e:
    # In a production system, failed initialization should be a critical alert.
    # For now, we log it. The application will work but without tracing.
    print(f"Failed to initialize SkyWalking agent: {e}")


# --- Structured Logging Setup ---
# We create a custom JSON formatter that will inject the trace_id.
class SkyWalkingJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(SkyWalkingJsonFormatter, self).add_fields(log_record, record, message_dict)
        context: SpanContext = get_context()
        # The crucial link: inject trace_id into every log record.
        if context and context.trace_id:
            log_record['trace_id'] = context.trace_id
        log_record['service'] = SERVICE_NAME


logHandler = logging.StreamHandler()
formatter = SkyWalkingJsonFormatter('%(asctime)s %(name)s %(levelname)s %(message)s')
logHandler.setFormatter(formatter)

logger = logging.getLogger('InferenceLogger')
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
logger.propagate = False # Avoid duplicate logs in CloudWatch


# --- ML Model Loading ---
# This is loaded once per cold start.
try:
    sentiment_pipeline = pipeline(
        "sentiment-analysis", 
        model="distilbert-base-uncased-finetuned-sst-2-english"
    )
    logger.info("Hugging Face pipeline loaded successfully.")
except Exception as e:
    logger.error({"event": "model_load_failure", "error": str(e)})
    sentiment_pipeline = None


# --- Lambda Handler ---
def inference_handler(event, context):
    """
    Main Lambda handler for performing sentiment analysis.
    """
    if not sentiment_pipeline:
        return {
            "statusCode": 500,
            "body": "Model not loaded. Check cold start logs."
        }

    # The SkyWalking agent automatically creates an entry span for the Lambda invocation.
    # We can get the context and create child spans for more granularity.
    ctx: SpanContext = get_context()
    
    try:
        input_text = event.get('text', '')
        if not input_text:
            logger.warning({"event": "missing_input_text"})
            return {"statusCode": 400, "body": "Missing 'text' field in request."}

        # Create a custom child span for the inference logic. This is where we
        # can add business-specific tags and metrics.
        with ctx.new_local_span(op="HuggingFace/Inference") as span:
            span.layer = 'MachineLearning'
            span.tag(Tag(key='model.name', val='distilbert-sst-2'))
            span.tag(Tag(key='input.text.length', val=len(input_text)))

            logger.info(
                {"event": "inference_start", "text_length": len(input_text)}, 
                extra={"input_text": input_text[:100]} # Log truncated payload for debugging
            )

            # The actual ML work
            result = sentiment_pipeline(input_text)
            
            # Add results as tags to the span for visibility in the trace UI.
            span.tag(Tag(key='model.output.label', val=result[0]['label']))
            span.tag(Tag(key='model.output.score', val=round(result[0]['score'], 4)))

            logger.info(
                {"event": "inference_success", "label": result[0]['label'], "score": result[0]['score']}
            )

            return {
                "statusCode": 200,
                "body": result[0]
            }

    except Exception as e:
        logger.error({"event": "inference_failure", "error": str(e)}, exc_info=True)
        # Tag the span with error status
        if 'span' in locals() and span:
            span.error_occurred = True
            span.log(ex=e)
        
        return {
            "statusCode": 500,
            "body": "An internal error occurred during inference."
        }

Next, we built the Vector Lambda Extension. This involved creating a directory structure that Lambda recognizes and writing the vector.yaml configuration file. This config is the brain of the operation.

The directory structure for deployment:

.
├── lambda/
│   └── src/
│       ├── handler.py
│       └── requirements.txt
├── extensions/
│   └── vector      # The compiled Vector binary for linux-x86_64-gnu
└── vector.yaml
└── template.yaml   # AWS SAM template

The vector.yaml is where the real work of routing and transformation happens. It defines a source (the Lambda Telemetry API), a series of transforms to parse and enrich the data, and multiple sinks (OTLP endpoints for SkyWalking).

vector.yaml:

# Data sources - where Vector gets its data from.
sources:
  lambda_logs:
    # This source subscribes to the AWS Lambda Logs API, which is available
    # inside a Lambda Extension environment.
    type: "aws_lambda_logs"
    # The AWS_LAMBDA_RUNTIME_API environment variable is provided by the Lambda runtime.
    endpoint: "${AWS_LAMBDA_RUNTIME_API}"

# Data transformations - parsing, enriching, and manipulating events.
transforms:
  # Route and parse logs from the function itself. Platform logs (like REPORT) are ignored.
  parse_function_logs:
    type: "remap"
    inputs:
      - "lambda_logs"
    # The pitfall here is not filtering log types. We only care about `function` logs.
    # Platform logs contain runtime metrics but are better handled via the Telemetry API in more complex setups.
    source: |
      if .type != "function" {
          abort()
      }
      # The log event from Lambda is a string. We must parse it as JSON.
      # The `??` operator provides a fallback, preventing crashes on non-JSON logs.
      parsed, err = parse_json(.record)
      if err != null {
          .message = .record
          log("Failed to parse log record as JSON: " + err, level: "warn")
      } else {
          # Promote the parsed JSON fields to the top level of the event.
          ., err = merge(., parsed)
          if err != null {
            log("Failed to merge parsed JSON: " + err, level: "error")
          }
      }
      # We already captured the original `record`, so we can remove it.
      remove(.record)

  # A dedicated transform to extract metrics from logs.
  # This demonstrates the power of treating logs as a source for metrics.
  extract_metrics_from_logs:
    type: "remap"
    inputs:
      - "parse_function_logs"
    source: |
      # Check if the log event signifies a successful inference.
      if .event == "inference_success" {
        # Create a new metric event.
        # This is how we convert a log line into a structured metric.
        # Note: . is the log event, and we are creating a new event using the `.`
        # function which emits a new event downstream.
        emit(
            %{
                "name": "inference_duration_ms",
                "type": "gauge",
                "value": .duration_ms, # Assuming the Python app logs this field.
                "tags": {
                    "service": .service,
                    "model_name": "distilbert-sst-2",
                    "result_label": .label
                }
            }
        )
      }
      # Let the original log event pass through unchanged.
      .

# Data sinks - where Vector sends the processed data.
sinks:
  skywalking_otlp_logs:
    type: "otlp"
    inputs:
      - "parse_function_logs" # Takes input from the log parser.
    # The endpoint is configured via an environment variable in the SAM template.
    endpoint: "${OAP_SERVER_ADDR}"
    # The key to sending OTLP logs is setting the right signal type.
    default_signal_type: "log"
    # SkyWalking requires specific resource attributes for service identification.
    # We pull the service name from the enriched log event.
    resource:
      service.name: "{{ service }}"
      
    encoding:
      # OTLP requires logs to be nested under a 'body' field.
      codec: "json"
      only_fields: ["body"]

  skywalking_otlp_metrics:
    type: "otlp"
    inputs:
      - "extract_metrics_from_logs" # Takes input from the metrics extractor.
    endpoint: "${OAP_SERVER_ADDR}"
    default_signal_type: "metric"
    resource:
      service.name: "{{ service }}"

Finally, we used AWS SAM to package and deploy everything. The SAM template defines the Lambda function, the layer for the Vector extension, and the necessary permissions and environment variables.

template.yaml:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  Serverless ML Inference with correlated observability using SkyWalking and Vector.

Globals:
  Function:
    Timeout: 30
    MemorySize: 2048 # ML models can be memory-hungry.

Resources:
  VectorExtensionLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: vector-extension-layer
      Description: Vector.dev Lambda Extension
      ContentUri: ./extensions/
      CompatibleArchitectures:
        - x86_64
      LicenseInfo: 'Apache-2.0'
      RetentionPolicy: Retain

  InferenceFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: sentiment-analysis-function
      PackageType: Zip
      CodeUri: lambda/src/
      Handler: handler.inference_handler
      Runtime: python3.9
      Architectures:
        - x86_64
      # Attach the Vector extension layer.
      Layers:
        - !Ref VectorExtensionLayer
      Policies:
        # Standard Lambda execution policy. No other permissions needed for this example.
        - AWSLambdaBasicExecutionRole
      Environment:
        Variables:
          # Configuration for the Python SkyWalking Agent
          SERVICE_NAME: "Serverless-ML-Inference"
          OAP_SERVER_ADDR: "YOUR_PUBLIC_OAP_IP:11800" # IMPORTANT: Use the public IP of your SkyWalking OAP
          # Configuration for the Vector Extension
          VECTOR_CONFIG: "/opt/extensions/vector.yaml" # Path to config inside the layer
      FunctionUrl:
        AuthType: NONE # Creates a public Function URL for easy testing.

Outputs:
  FunctionUrl:
    Description: "URL for the sentiment analysis function"
    Value: !GetAtt InferenceFunctionUrl.FunctionUrl

After deploying, a call to the function URL with a JSON payload like {"text": "Vector and SkyWalking are a powerful combination for observability."} yielded the desired result. In the SkyWalking UI, the trace appeared under the Serverless-ML-Inference service. The trace showed the main Lambda span and the child HuggingFace/Inference span, complete with the custom tags we added. The magic happened when clicking the “Logs” tab for that trace. Because the trace_id was in both the trace context and the JSON logs processed by Vector, SkyWalking automatically filtered and displayed the exact log lines for that invocation: inference_start and inference_success. The logs were no longer a disconnected firehose but a precise, request-scoped narrative of the execution.

This solution is not without its trade-offs. The Vector extension adds to the package size and contributes to the cold start duration. In a production environment with high traffic, sending 100% of traces and logs to the backend is prohibitively expensive. The next logical iteration would be to implement a sampling strategy. This could be configured in the SkyWalking Python agent (head-based sampling) or, more powerfully, in a separate Vector aggregator instance that receives data from all Lambda functions and performs intelligent tail-based sampling before forwarding to SkyWalking. The current architecture also places the Vector configuration inside the deployment package; a more flexible approach would involve fetching the configuration from a central source like S3 or a config service on startup, allowing for dynamic updates without redeploying the function.


  TOC