Propagating Distributed Trace Context and OIDC Claims Across a Redux SPA, Sanic Gateway, and Elixir Service


The system had become an opaque box. A user would report intermittent slowness, and the ticket would land on our desk with a vague description: “The dashboard takes forever to load.” The request path was a journey through a polyglot landscape: a React single-page application using Redux for state management, an asynchronous Sanic API gateway handling ingress and authentication, and a set of Elixir microservices doing the heavy lifting with their legendary concurrency. Finding the bottleneck was an exercise in manual log correlation across three different technology stacks—a process that was both time-consuming and prone to error. The core pain point was twofold: we lacked a unified view of a single request’s lifecycle, and we had no consistent way to attribute actions within our backend services to the specific authenticated user who initiated them.

Our initial concept was to implement a distributed tracing system and piggyback our identity propagation on top of it. The goal was to achieve end-to-end visibility. For any given user interaction, we wanted to see a single, coherent trace that started in the browser, traversed the Python gateway, and finished in the Elixir service, with each step annotated with performance metrics. Furthermore, we needed to pass critical user claims from the OpenID Connect (OIDC) JWT, like the user’s sub (subject identifier), from the gateway down to the services that needed it.

The technology selection was dictated by our existing stack, with one addition:

  1. Observability Backend: Apache SkyWalking. We chose SkyWalking primarily for its robust polyglot support and its relatively straightforward agent-based instrumentation for languages like Python. Its compatibility with OpenTelemetry also provided a path for future-proofing.
  2. API Gateway: Sanic. Our existing Python gateway was built on Sanic for its async performance. The challenge was to integrate tracing without compromising its non-blocking nature.
  3. Core Service: Elixir. The backend service, written in Elixir for its fault tolerance, needed a way to participate in the trace initiated by upstream components.
  4. Frontend: React with Redux. Redux was already managing our application state, including the OIDC tokens. The task was to make our API calls from the SPA the starting point of our distributed traces.
  5. Authentication: OIDC. Our corporate identity provider (IdP) uses OIDC. The SPA receives an id_token and an access_token in the form of a JWT upon login.

Our first step was to stand up the SkyWalking backend. For a local development and testing environment, a Docker Compose setup is sufficient. It defines the OAP (Observability Analysis Platform) server, the UI, and an underlying storage engine, in this case, Elasticsearch.

# docker-compose.yml
version: '3.8'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.9
    container_name: skywalking-elasticsearch
    ports:
      - "9200:9200"
    healthcheck:
      test: ["CMD-SHELL", "curl -s --user elastic:changeme -X GET http://localhost:9200/_cluster/health | grep -q '\"status\":\"green\|yellow\"'"]
      interval: 10s
      timeout: 5s
      retries: 5
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - ELASTIC_PASSWORD=changeme
      - xpack.security.enabled=true
    ulimits:
      memlock:
        soft: -1
        hard: -1

  oap:
    image: apache/skywalking-oap-server:9.4.0
    container_name: skywalking-oap
    depends_on:
      elasticsearch:
        condition: service_healthy
    links:
      - elasticsearch
    ports:
      - "11800:11800"
      - "12800:12800"
    healthcheck:
      test: ["CMD-SHELL", "/bin/sh", "-c", "curl --fail http://localhost:12800/graphql --silent -X POST -H 'Content-Type: application/json' --data '{\"query\":\"{health}\"}' | grep -q '\"status\":\"healthy\"'"]
      interval: 10s
      timeout: 5s
      retries: 5
    environment:
      - SW_STORAGE=elasticsearch
      - SW_STORAGE_ES_CLUSTER_NODES=elasticsearch:9200
      - SW_STORAGE_ES_USER=elastic
      - SW_STORAGE_ES_PASSWORD=changeme
      - SW_HEALTH_CHECKER=default
      - SW_TELEMETRY=prometheus

  ui:
    image: apache/skywalking-ui:9.4.0
    container_name: skywalking-ui
    depends_on:
      oap:
        condition: service_healthy
    links:
      - oap
    ports:
      - "8080:8080"
    environment:
      - SW_OAP_ADDRESS=http://oap:12800

With the observability backend running, the next task was instrumenting our Sanic gateway. The skywalking-python agent is the official tool for this. The pitfall here is assuming the agent will handle everything automatically. While it does a remarkable job of instrumenting known frameworks and libraries, propagating custom application-level context, like OIDC claims, requires manual intervention.

In a real-world project, the gateway is the perfect place to validate the incoming JWT and extract the necessary claims. These claims then need to be passed downstream. Injecting them into the trace context itself is possible but can be complex. A more pragmatic approach is to pass them in a separate, dedicated HTTP header.

Here is the complete Sanic gateway implementation. It includes agent initialization, a middleware for JWT processing, and a proxy endpoint.

# gateway/main.py
import os
import logging
from functools import wraps

import httpx
from sanic import Sanic, response
from sanic.exceptions import Unauthorized
from jose import jwt, JWTError

# --- SkyWalking Agent Initialization ---
# This MUST be done before importing any other instrumented library (like Sanic or httpx)
# In a production environment, these settings would come from environment variables.
def initialize_skywalking():
    try:
        from skywalking import config, agent
        
        # Service name identifies this application in the SkyWalking UI
        config.init(
            service_name='sanic-api-gateway', 
            collector_address='localhost:11800'
        )
        agent.start()
        logging.info("SkyWalking Python agent started successfully.")
    except ImportError:
        logging.warning("SkyWalking agent not found. Tracing will be disabled.")
    except Exception as e:
        logging.error(f"Failed to start SkyWalking agent: {e}")

initialize_skywalking()
# --- End SkyWalking Initialization ---

app = Sanic("APIGateway")

# Configuration - In a real app, use environment variables.
ELIXIR_SERVICE_URL = os.getenv("ELIXIR_SERVICE_URL", "http://localhost:4000")
OIDC_JWKS_URL = os.getenv("OIDC_JWKS_URL", "http://localhost:8081/realms/master/protocol/openid-connect/certs")
OIDC_AUDIENCE = os.getenv("OIDC_AUDIENCE", "account")

# A simple in-memory cache for the JWKS to avoid fetching it on every request.
jwks_cache = None

async def get_jwks():
    """Fetches and caches the OIDC JSON Web Key Set."""
    global jwks_cache
    if jwks_cache:
        return jwks_cache
    
    async with httpx.AsyncClient() as client:
        try:
            logging.info(f"Fetching JWKS from {OIDC_JWKS_URL}")
            resp = await client.get(OIDC_JWKS_URL)
            resp.raise_for_status()
            jwks_cache = resp.json()
            return jwks_cache
        except httpx.HTTPError as e:
            logging.error(f"Failed to fetch JWKS: {e}")
            # In a production system, you might have a fallback or fail-open/closed strategy
            raise Unauthorized("Could not verify token; identity provider unreachable.")

def authorized():
    """Decorator to protect routes by validating the OIDC JWT."""
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            auth_header = request.headers.get("Authorization")
            if not auth_header or not auth_header.startswith("Bearer "):
                raise Unauthorized("Missing or malformed Authorization header.")

            token = auth_header.split(" ")[1]
            
            try:
                jwks = await get_jwks()
                unverified_header = jwt.get_unverified_header(token)
                rsa_key = {}
                for key in jwks["keys"]:
                    if key["kid"] == unverified_header["kid"]:
                        rsa_key = {
                            "kty": key["kty"],
                            "kid": key["kid"],
                            "use": key["use"],
                            "n": key["n"],
                            "e": key["e"],
                        }
                if rsa_key:
                    payload = jwt.decode(
                        token,
                        rsa_key,
                        algorithms=["RS256"],
                        audience=OIDC_AUDIENCE,
                    )
                    # This is the critical step: attach the claims to the request context
                    # so the middleware can access them.
                    request.ctx.claims = {
                        "sub": payload.get("sub"),
                        "email": payload.get("email"),
                        "preferred_username": payload.get("preferred_username")
                    }
                else:
                    raise Unauthorized("Unable to find corresponding key to verify token.")

            except JWTError as e:
                logging.warning(f"JWT validation failed: {e}")
                raise Unauthorized(f"Token is invalid: {e}")
            
            return await f(request, *args, **kwargs)
        return decorated_function
    return decorator


@app.on_request
async def propagate_context(request):
    """
    This middleware is responsible for propagating identity.
    The SkyWalking agent automatically handles the sw8 trace header.
    Our job is to add our custom application-level context.
    """
    if hasattr(request.ctx, "claims"):
        # We've chosen a simple, non-standard header. In a more formal
        # architecture, you might use something like a signed JWT or a
        # standardized claim propagation format.
        # A common mistake is to simply forward the entire original JWT.
        # This is often a security risk and inefficient. Forward only what is needed.
        claims_to_forward = request.ctx.claims
        if claims_to_forward and claims_to_forward.get("sub"):
             # Only forward if claims were successfully parsed by the decorator
             # Use a simple header for the user's subject ID.
            request.headers["X-User-Subject"] = claims_to_forward["sub"]


@app.route("/api/data", methods=["GET"])
@authorized()
async def proxy_to_elixir_service(request):
    """
    A protected endpoint that proxies the request to the backend Elixir service.
    """
    async with httpx.AsyncClient() as client:
        try:
            # The SkyWalking agent automatically instruments httpx, so the `sw8` header
            # and a new child span will be created and propagated.
            # We also pass our custom identity header.
            headers_to_forward = {
                'Content-Type': 'application/json',
                # This is CRITICAL. The SkyWalking agent adds the 'sw8' header to the
                # request object's headers dictionary. We must pass it on.
                'sw8': request.headers.get('sw8'),
                'X-User-Subject': request.headers.get('X-User-Subject')
            }
            # Filter out None values
            headers_to_forward = {k: v for k, v in headers_to_forward.items() if v is not None}

            logging.info(f"Forwarding request to Elixir service with headers: {headers_to_forward.keys()}")

            target_url = f"{ELIXIR_SERVICE_URL}/internal/data"
            resp = await client.get(target_url, headers=headers_to_forward, timeout=5.0)
            
            resp.raise_for_status() # Raise an exception for 4xx/5xx responses
            
            return response.json(resp.json(), status=resp.status_code)

        except httpx.ReadTimeout:
            logging.error("Request to Elixir service timed out.")
            return response.json({"error": "Upstream service timeout"}, status=504)
        except httpx.HTTPStatusError as e:
            logging.error(f"Upstream service returned error: {e.response.status_code}")
            return response.json(
                {"error": "Upstream service error", "details": e.response.text},
                status=e.response.status_code
            )
        except Exception as e:
            logging.exception("An unexpected error occurred during proxying.")
            return response.json({"error": "Internal Server Error"}, status=500)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=True, auto_reload=True)

With the gateway handled, we turned to the Elixir service. The OpenTelemetry ecosystem in Elixir is the recommended path. We needed a Plug that would inspect incoming requests for the sw8 (trace context) and X-User-Subject (our custom identity) headers.

This Plug needs to correctly initialize the OpenTelemetry context for the current process, ensuring that any subsequent operations within that request’s lifecycle are associated with the correct parent span. A common mistake here is mishandling the context in Elixir’s process-based concurrency model, leading to broken traces.

# elixir_service/lib/elixir_service_web/plugs/trace_context_plug.ex
defmodule ElixirServiceWeb.Plugs.TraceContextPlug do
  @moduledoc """
  A plug to extract trace context and application-specific headers from incoming requests.
  """
  import Plug.Conn

  # Using OpenTelemetry libraries
  alias OpenTelemetry.Propagation.{HttpPropagator, TextMap}
  alias OpenTelemetry.{Context, Span}

  def init(opts), do: opts

  def call(conn, _opts) do
    # 1. Extract the trace context (e.g., sw8 header) using the configured propagator.
    # The Otel SDK is configured to understand the sw8 format.
    ctx = HttpPropagator.extract(conn, getter: &header_getter/2)
    
    # 2. Attach the extracted context so it becomes the active context for this process.
    Context.attach(ctx)
    
    # 3. Start a new span as a child of the incoming context.
    # This represents the work done within this service.
    span_name = "#{conn.method} #{conn.request_path}"
    
    # This starts a new span and makes it the *current* span in the newly attached context.
    # The `:server` kind is important for semantic conventions.
    OpenTelemetry.Tracer.with_span span_name, %{kind: :server} do
      # 4. Extract our custom identity header and add it as an attribute to the new span.
      # This makes the user's identity visible directly within the trace data in SkyWalking.
      case get_req_header(conn, "x-user-subject") do
        [subject | _] -> 
          Span.set_attribute("app.user.subject", subject)
          # Also, put it in the connection's private storage for business logic access.
          put_private(conn, :user_subject, subject)
        _ ->
          Span.set_attribute("app.user.subject", "anonymous")
          conn
      end
    end
  end

  # Helper function for the OpenTelemetry propagator to read headers from the Plug.Conn.
  defp header_getter(conn, key) do
    case get_req_header(conn, key) do
      [] -> nil
      values -> values
    end
  end
end

# elixir_service/lib/elixir_service_web/router.ex
defmodule ElixirServiceWeb.Router do
  use ElixirServiceWeb, :router
  
  pipeline :api do
    plug :accepts, ["json"]
    # CRITICAL: This plug must run early in the pipeline.
    plug ElixirServiceWeb.Plugs.TraceContextPlug
  end

  scope "/", ElixirServiceWeb do
    pipe_through :api
    
    get "/internal/data", DataController, :index
  end
end

# elixir_service/lib/elixir_service_web/controllers/data_controller.ex
defmodule ElixirServiceWeb.DataController do
  use ElixirServiceWeb, :controller
  alias OpenTelemetry.Span
  
  def index(conn, _params) do
    # Access the user subject that the plug stored for us.
    user_subject = conn.private[:user_subject] || "unknown"

    # Add an event to the current span to mark the beginning of business logic.
    Span.add_event("Starting data processing for user", %{"user.subject" => user_subject})

    # Simulate some work
    :timer.sleep(Enum.random(50..150))
    
    # In a real app, you would make database calls or call other services here.
    # OpenTelemetry libraries for Ecto or HTTP clients would automatically create
    # child spans, continuing the trace.
    
    Span.set_attribute("app.response.generated", "true")

    json(conn, %{
      message: "This is sensitive data from the Elixir service.",
      processed_for_user: user_subject,
      timestamp: DateTime.utc_now()
    })
  end
end

# elixir_service/config/config.exs
# Basic OpenTelemetry configuration
config :opentelemetry, :processors,
  otel_batch_processor: %{
    exporter: {:otel_exporter_otlp, [
      # For SkyWalking OTLP receiver
      protocol: :http_protobuf,
      endpoint: "http://localhost:11800/v1/traces"
    ]},
    # More configs for batching, etc.
  }

config :opentelemetry, :propagators, [
  # IMPORTANT: Configure the propagator to understand SkyWalking's `sw8` header.
  # This requires a specific library for SkyWalking propagation format.
  # Assuming opentelemetry-propagator-skywalking is added as a dependency.
  OpenTelemetry.Propagation.Skywalking
]

The final piece was the frontend. We needed to initiate the trace when making an API call. The skywalking-client-js SDK provides this functionality. The integration point is an API client interceptor (e.g., for Axios or a custom fetch wrapper). This interceptor is responsible for two things: attaching the Authorization header with the OIDC token from the Redux store and letting the SkyWalking SDK inject the sw8 trace header.

// src/services/api.ts
import axios from 'axios';
import { store } from '../store'; // Your Redux store instance
import ClientMonitor from 'skywalking-client-js';

// Initialize the SkyWalking client.
// In a real app, these values would come from a configuration file.
ClientMonitor.init({
  service: 'frontend-spa-app',
  pagePath: 'http://localhost:3000', // Your app's base URL
  collector: 'http://localhost:8080', // NOTE: This points to the SkyWalking UI/forwarder, not the OAP directly
  jsErrors: true, // Report JS errors
  apiErrors: true, // Report API errors
  // ... other configurations
});

const apiClient = axios.create({
  baseURL: '/api', // Proxied by the dev server to the Sanic gateway
});

// Axios request interceptor: The heart of the frontend integration.
apiClient.interceptors.request.use(
  (config) => {
    // 1. Get the current authentication state from Redux.
    const state = store.getState();
    const accessToken = state.auth.accessToken;

    if (accessToken) {
      // 2. Attach the OIDC access token for authentication at the gateway.
      config.headers.Authorization = `Bearer ${accessToken}`;
    }

    // 3. The skywalking-client-js SDK automatically patches `fetch` and `XMLHttpRequest`,
    // which Axios uses under the hood. When this request is made, the SDK will
    // generate a new `sw8` header and add it. No manual header injection is needed
    // for tracing, which is a key advantage. The span for the HTTP request will be
    
    // a child of the current "page load" or "user action" span.

    return config;
  },
  (error) => {
    // A common mistake is not logging interceptor errors.
    console.error('Error in Axios request interceptor:', error);
    return Promise.reject(error);
  }
);

export default apiClient;

// src/store/authSlice.ts - A simplified Redux Toolkit slice
import { createSlice, PayloadAction } from '@reduxjs/toolkit';

interface AuthState {
  accessToken: string | null;
  user: { sub: string, name: string } | null;
  isAuthenticated: boolean;
}

const initialState: AuthState = {
  accessToken: null,
  user: null,
  isAuthenticated: false,
};

const authSlice = createSlice({
  name: 'auth',
  initialState,
  reducers: {
    setLoginSuccess: (state, action: PayloadAction<{ accessToken: string; user: any }>) => {
      state.accessToken = action.payload.accessToken;
      state.user = action.payload.user;
      state.isAuthenticated = true;
    },
    setLogout: (state) => {
      state.accessToken = null;
      state.user = null;
      state.isAuthenticated = false;
    },
  },
});

export const { setLoginSuccess, setLogout } = authSlice.actions;
export default authSlice.reducer;

With all pieces in place, the complete request flow could be visualized.

sequenceDiagram
    participant Browser (Redux)
    participant Sanic Gateway
    participant Elixir Service
    participant SkyWalking OAP

    Browser (Redux)->>+Sanic Gateway: GET /api/data 
Headers:
Authorization: Bearer [JWT]
sw8: [trace-context] Note right of Browser (Redux): skywalking-client-js creates root span and `sw8` header.
Axios interceptor adds `Authorization` header from Redux store. Sanic Gateway->>Sanic Gateway: 1. SkyWalking agent creates entry span. Sanic Gateway->>Sanic Gateway: 2. `authorized` decorator validates JWT. Sanic Gateway->>Sanic Gateway: 3. Middleware extracts `sub` claim. Sanic Gateway->>+Elixir Service: GET /internal/data
Headers:
sw8: [updated-context]
X-User-Subject: [user-sub] Note right of Sanic Gateway: SkyWalking agent creates exit span.
Forwards `sw8` and adds custom identity header. Elixir Service->>Elixir Service: 1. `TraceContextPlug` reads headers. Elixir Service->>Elixir Service: 2. OTEL SDK creates entry span as child of gateway's span. Elixir Service->>Elixir Service: 3. Business logic runs. Elixir Service-->>-Sanic Gateway: 200 OK - Response Data Sanic Gateway-->>-Browser (Redux): 200 OK - Response Data Note over Browser (Redux), SkyWalking OAP: All services asynchronously report their span data to the SkyWalking OAP. Browser (Redux)->>SkyWalking OAP: Report frontend span Sanic Gateway->>SkyWalking OAP: Report gateway spans Elixir Service->>SkyWalking OAP: Report service span

The final result was transformative. That vague “dashboard is slow” ticket could now be analyzed with precision. In the SkyWalking UI, we could filter traces by the /api/data endpoint, find a slow one, and see the entire lifecycle. We might discover that the browser-to-gateway leg was fast (ruling out network issues), the gateway processing was negligible, but the Elixir service span was taking 2 seconds. We could then drill into that span, see its attributes (including the app.user.subject), and correlate its traceId with the Elixir service’s structured logs to find the exact log entries for that specific, slow request, initiated by that specific user.

This solution is not without its limitations. The propagation of user identity via a custom X-User-Subject header is pragmatic but lacks a formal security posture. A downstream service implicitly trusts the gateway. In a zero-trust environment, services might need to be given the original JWT to perform their own validation, which complicates key management. Additionally, our front-end tracing is coarse; we’re tracing the API call but not the preceding user interactions or React component rendering time that contributed to the perceived latency. Future iterations could involve integrating SkyWalking’s user action monitoring to create spans for button clicks and other UI events. Finally, this entire setup relies on application-level instrumentation. An alternative path would be to leverage a service mesh like Istio, which can automate trace context propagation at the infrastructure layer, but this introduces a significant increase in operational complexity and may not be suitable for all teams.


  TOC