Unifying GraphQL Subscriptions and NATS Streaming via an Envoy Proxy Gateway


The initial problem presented as a classic frontend state management dilemma. Our team was building a real-time inventory and pricing dashboard using Material-UI. The core requirement was a data-rich view that needed both on-demand, complex data fetching and instantaneous updates pushed from the backend. The initial GraphQL implementation for queries worked beautifully, allowing the frontend to fetch precisely the nested data it needed for initial renders. The trouble started with the real-time updates.

Our first attempt involved opening a separate WebSocket connection directly to a NATS-backed service. This immediately created two sources of truth on the client. A user would load the dashboard via a GraphQL query, and a split-second later, a NATS message would arrive with an update, forcing a messy, manual reconciliation of the Apollo Client cache with the incoming WebSocket data. This approach was rife with race conditions and significantly complicated the client-side logic. Furthermore, it meant managing two separate authentication and authorization lifecycles, doubling the security surface area and operational burden.

The goal became clear: unify these two data patterns into a single, coherent stream consumed by the client. The frontend should not be aware of the underlying transport dichotomy. It should make a GraphQL query to fetch initial state and issue a GraphQL subscription to receive subsequent updates, all over a single, secure connection. This led us down a path of architecting a gateway layer that could intelligently bridge these two worlds, which is where Envoy, NATS JetStream, and a federated GraphQL architecture became indispensable.

The Architectural Foundation: A Post-mortem on Simpler Designs

Before settling on the final architecture, we discarded two simpler patterns.

  1. In-Memory GraphQL PubSub: Using the standard graphql-subscriptions library with its PubSub engine was the path of least resistance. It failed our resilience requirements. A service restart or crash would wipe all subscription context, and there was no mechanism for message delivery guarantees. In a real-world project, “fire-and-forget” is rarely acceptable for critical business data.
  2. Client-Side Dual Connection: As described, this offloaded an immense amount of complexity onto the client. Each developer working on the MUI frontend would need to be an expert in cache reconciliation, connection management, and the potential failure modes of two distinct protocols. The pitfall here is that such complexity is almost never handled consistently across a team or over the lifetime of a project.

The chosen path had to provide a single entry point, robust message delivery, and a clean abstraction for the frontend. We decided on a GraphQL Federation model using Apollo, as our inventory and pricing domains were managed by separate teams. NATS JetStream was selected over vanilla NATS for its persistence and at-least-once delivery semantics. Envoy was the lynchpin, serving as the unified ingress point to orchestrate traffic to these disparate backend components.

Here is the high-level data flow we engineered:

sequenceDiagram
    participant Client (MUI App)
    participant Envoy
    participant AuthZ Service
    participant Apollo Gateway
    participant GQL-NATS Bridge
    participant Inventory Service (GQL)
    participant Pricing Service (NATS Publisher)
    participant NATS JetStream

    Client->>+Envoy: Upgrade to WebSocket (GraphQL Subscription)
    Envoy->>+AuthZ Service: Check Request
    AuthZ Service-->>-Envoy: OK
    Envoy->>+GQL-NATS Bridge: Forward WebSocket Connection
    GQL-NATS Bridge->>+NATS JetStream: Create Durable Consumer
    NATS JetStream-->>-GQL-NATS Bridge: Acknowledge Consumer
    GQL-NATS Bridge-->>-Envoy: Connection Established
    Envoy-->>-Client: WebSocket Upgraded

    loop Real-time Price Updates
        Pricing Service->>+NATS JetStream: Publish Price Update
        NATS JetStream->>+GQL-NATS Bridge: Push Message
        GQL-NATS Bridge->>GQL-NATS Bridge: Format as GraphQL Payload
        GQL-NATS Bridge->>+Envoy: Send Payload over WebSocket
        Envoy->>+Client: Forward Payload
        Client->>Client: Update Apollo Cache / MUI View
        GQL-NATS Bridge->>-NATS JetStream: Ack Message
    end

    Client->>+Envoy: Standard HTTP (GraphQL Query)
    Envoy->>+AuthZ Service: Check Request
    AuthZ Service-->>-Envoy: OK
    Envoy->>+Apollo Gateway: Forward GraphQL Query
    Apollo Gateway->>+Inventory Service (GQL): Sub-query for inventory
    Inventory Service (GQL)-->>-Apollo Gateway: Inventory Data
    Apollo Gateway-->>-Envoy: Stitched Response
    Envoy-->>-Client: Final GraphQL Response

Core Implementation: The Docker Compose Stack

To manage this multi-service environment during development, we relied on Docker Compose. This provides a complete, runnable stack. A common mistake is to develop these services in isolation, only to find integration issues later.

# docker-compose.yml
version: '3.8'
services:
  nats:
    image: nats:2.9-alpine
    ports:
      - "4222:4222"
      - "8222:8222"
    command: "-js" # Enable JetStream

  envoy:
    image: envoyproxy/envoy:v1.24.0
    ports:
      - "8080:8080"
    volumes:
      - ./envoy/envoy.yaml:/etc/envoy/envoy.yaml:ro
    depends_on:
      - apollo-gateway
      - graphql-nats-bridge

  # Represents a standard GraphQL service for queries
  inventory-service:
    build: ./services/inventory-service
    ports:
      - "4001:4000"

  # The Apollo Gateway that federates our GraphQL services
  apollo-gateway:
    build: ./services/apollo-gateway
    ports:
      - "4000:4000"
    depends_on:
      - inventory-service
      - graphql-nats-bridge

  # The critical bridge between GraphQL Subscriptions and NATS
  graphql-nats-bridge:
    build: ./services/graphql-nats-bridge
    ports:
      - "4002:4000"
    environment:
      - NATS_URL=nats://nats:4222
    depends_on:
      - nats

  # A simple service that publishes updates to NATS
  pricing-publisher:
    build: ./services/pricing-publisher
    environment:
      - NATS_URL=nats://nats:4222
    depends_on:
      - nats

The GraphQL-NATS Bridge: The Heart of the System

This is the most critical custom component. It exposes a GraphQL schema with a Subscription field and translates NATS messages into a GraphQL-compliant stream. We used Node.js with graphql-yoga and the nats.js library.

The key challenge is correctly handling the AsyncIterator that GraphQL subscriptions expect, while managing the lifecycle of a NATS JetStream consumer.

// services/graphql-nats-bridge/index.js
import { createServer, createPubSub } from 'graphql-yoga';
import { connect, JSONCodec, consumerOpts, createInbox } from 'nats';

// In a real-world project, NATS connection details come from config.
const NATS_URL = process.env.NATS_URL || 'nats://localhost:4222';

const typeDefs = `
  type PriceUpdate {
    productId: ID!
    price: Float!
    timestamp: String!
  }

  type Query {
    _dummy: String # Subscriptions-only service needs a Query type
  }

  type Subscription {
    priceUpdates(productId: ID!): PriceUpdate!
  }
`;

// This pubsub is NOT for transport, but to map NATS messages to active GraphQL iterators.
const pubSub = createPubSub();
const jsonCodec = JSONCodec();

// Centralized NATS connection management
async function initializeNats() {
    try {
        const nc = await connect({ servers: NATS_URL });
        const js = nc.jetstream();
        console.log('Connected to NATS JetStream.');

        // Ensure the stream exists. Idempotent operation.
        const jsm = await nc.jetstreamManager();
        await jsm.streams.add({ name: 'PRICES', subjects: ['prices.*'] });
        console.log('Stream "PRICES" is ready.');

        // Create a durable consumer to listen to all price updates
        const opts = consumerOpts();
        opts.durable('graphql-bridge-consumer');
        opts.manualAck(); // We must acknowledge messages
        opts.ackWait(30000); // 30 seconds to ack
        opts.deliverTo(createInbox());

        const sub = await js.subscribe('prices.*', opts);
        console.log('Subscribed to "prices.*" subject.');

        (async () => {
            for await (const m of sub) {
                try {
                    const data = jsonCodec.decode(m.data);
                    // The topic name is crucial for routing the update
                    // Topic format: `priceUpdates:${productId}`
                    const topic = `priceUpdates:${data.productId}`;
                    pubSub.publish(topic, { priceUpdates: data });
                    m.ack(); // Acknowledge message after successful publish
                } catch (err) {
                    console.error('Error processing NATS message:', err);
                    // Decide on error strategy: m.nak(), m.term(), or just log.
                    // For now, we log and let it timeout, causing redelivery.
                }
            }
        })().then(() => console.log("NATS subscription closed."));

    } catch (err) {
        console.error('Failed to connect or subscribe to NATS:', err);
        process.exit(1);
    }
}

initializeNats();

const resolvers = {
    Subscription: {
        priceUpdates: {
            subscribe: (_, { productId }) => {
                if (!productId) {
                    throw new Error('productId is required for priceUpdates subscription.');
                }
                // The topic name must exactly match what the NATS listener publishes to.
                const topic = `priceUpdates:${productId}`;
                return pubSub.subscribe(topic);
            },
            // The resolver just passes the payload through.
            resolve: (payload) => payload.priceUpdates,
        },
    },
};

const server = createServer({
    schema: {
        typeDefs,
        resolvers,
    },
});

server.start().catch(err => {
    console.error('GraphQL server failed to start:', err);
});

Key design choices in the bridge:

  1. Centralized NATS Consumer: We create a single, durable consumer in the bridge service when it starts. This is far more efficient than creating a new NATS consumer for every single GraphQL subscription request. A common mistake is to tightly couple the GraphQL subscription lifecycle with the NATS consumer lifecycle, which leads to performance bottlenecks.
  2. In-Memory PubSub as a Router: We use graphql-yoga‘s createPubSub instance not as a transport mechanism, but as an in-memory event bus. Its job is to map incoming messages from the single NATS consumer to the many active GraphQL subscribers based on a topic key (e.g., priceUpdates:PRODUCT-123).
  3. Manual Acknowledgements: Using m.ack() is critical. It signals to NATS JetStream that we have successfully processed the message. If our bridge service crashes before acknowledging, JetStream will redeliver the message once the service recovers, preventing lost UI updates. This provides the at-least-once delivery guarantee.

Envoy Configuration: The Unified Gateway

Envoy’s configuration is where the magic happens. It needs to inspect the incoming request and route it correctly. Standard HTTP POST requests to /graphql go to the Apollo Gateway. Requests that include Upgrade: websocket headers for the same path must be routed to our graphql-nats-bridge.

This envoy.yaml is non-trivial and demonstrates a production-grade setup with distinct clusters and routing rules.

# envoy/envoy.yaml
static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 8080
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match:
                  prefix: "/graphql"
                route:
                  # This enables WebSocket support for the matched route.
                  upgrade_configs:
                  - upgrade_type: "websocket"
                  # We use weighted clusters to route based on the presence of the Upgrade header.
                  # A common pitfall is trying to do this with simple path matching, which doesn't work for WebSockets.
                  weighted_clusters:
                    clusters:
                    # WebSocket traffic goes to the bridge
                    - name: "graphql_nats_bridge_service"
                      weight: 100
                    # Standard HTTP traffic goes to the main gateway
                    - name: "apollo_gateway_service"
                      weight: 0
                    # This key is used to select the correct cluster.
                    # If the header exists, 'graphql_nats_bridge_service' is chosen. Otherwise, 'apollo_gateway_service' is.
                    header_name: "Upgrade"
                    value: "websocket"
                    required: true

          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
  - name: apollo_gateway_service
    connect_timeout: 5s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: apollo_gateway_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: apollo-gateway
                port_value: 4000
  - name: graphql_nats_bridge_service
    connect_timeout: 5s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: graphql_nats_bridge_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: graphql-nats-bridge
                port_value: 4000

This configuration uses a slightly advanced but powerful Envoy feature: weighted_clusters with header_name routing. Instead of defining two separate routes for the /graphql path, we define one route. Within that route, we instruct Envoy to inspect the Upgrade header.

  • If Upgrade: websocket is present, it assigns 100% of the traffic to the graphql_nats_bridge_service cluster.
  • If that header is missing (i.e., it’s a standard HTTP request), the condition fails, and Envoy falls back to the cluster with weight: 0 but without a matching requirement—the apollo_gateway_service. This is a clean and maintainable way to split traffic for the same endpoint based on protocol.

Frontend Integration: A Custom MUI/React Hook

On the client, the complexity is now contained within a reusable React hook. This hook uses Apollo Client’s capabilities to handle both the initial query and the subsequent subscription data, ensuring the cache is always up-to-date.

First, the Apollo Client setup needs to be aware of the WebSocket link.

// client/src/apolloClient.js
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { getMainDefinition } from '@apollo/client/utilities';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions-ws';
import { createClient } from 'graphql-ws';

const httpLink = new HttpLink({
  uri: 'http://localhost:8080/graphql', // Envoy's address
});

const wsLink = new GraphQLWsLink(createClient({
  url: 'ws://localhost:8080/graphql', // Envoy's address for WebSockets
}));

// The split function takes three parameters:
//
// * A function that's called for each operation to execute
// * The Link to use for an operation if the function returns a "truthy" value
// * The Link to use for an operation if the function returns a "falsy" value
const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    );
  },
  wsLink,
  httpLink,
);

export const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});

Now, the custom hook that combines the query and subscription logic.

// client/src/hooks/useRealtimeProductData.js
import { useQuery, useSubscription } from '@apollo/client';
import { gql } from '@apollo/client';
import { useEffect } from 'react';

const GET_PRODUCT_QUERY = gql`
  query GetProduct($productId: ID!) {
    product(id: $productId) { # Assuming this comes from inventory-service
      id
      stock
      # Price data will be null/empty initially, filled by subscription
      latestPrice {
        price
        timestamp
      }
    }
  }
`;

const PRICE_UPDATE_SUBSCRIPTION = gql`
  subscription OnPriceUpdate($productId: ID!) {
    priceUpdates(productId: $productId) {
      productId
      price
      timestamp
    }
  }
`;

export function useRealtimeProductData(productId) {
  const { data, loading, error } = useQuery(GET_PRODUCT_QUERY, {
    variables: { productId },
  });

  // The subscription hook doesn't return data directly.
  // It's used to update the Apollo cache when new data arrives.
  useSubscription(PRICE_UPDATE_SUBSCRIPTION, {
    variables: { productId },
    onData: ({ client, data: subData }) => {
      if (subData.data?.priceUpdates) {
        const { priceUpdates } = subData.data;
        const cache = client.cache;

        // Manually update the cache with the new price info.
        // This is the core of client-side state reconciliation.
        cache.modify({
          id: cache.identify({ __typename: 'Product', id: productId }),
          fields: {
            latestPrice() {
              // Create a new object for the price update
              const newPriceRef = cache.writeFragment({
                  data: {
                      __typename: 'PriceUpdate',
                      price: priceUpdates.price,
                      timestamp: priceUpdates.timestamp
                  },
                  fragment: gql`
                      fragment NewPrice on PriceUpdate {
                          price
                          timestamp
                      }
                  `
              });
              return newPriceRef;
            },
          },
        });
      }
    },
  });

  return { product: data?.product, loading, error };
}

This hook encapsulates the entire data-fetching lifecycle. An MUI component can now simply call const { product, loading } = useRealtimeProductData('PRODUCT-123'); and be guaranteed to receive both the initial data and all subsequent real-time price updates without any further logic. The cache.modify function is Apollo’s sanctioned way of performing these surgical updates, ensuring the UI re-renders reactively.

Lingering Issues and Future Optimizations

This architecture, while robust, is not without its trade-offs and areas for improvement. The graphql-nats-bridge is a custom stateful service and a potential single point of failure; it must be deployed with multiple replicas for high availability, which introduces complexity in managing NATS durable consumer groups. Backpressure management is also a concern—if a client is connected but not processing messages quickly, the bridge service could accumulate memory. A more sophisticated implementation would monitor WebSocket buffer sizes and potentially slow down the NATS consumer.

A significant future optimization would be to eliminate the standalone graphql-nats-bridge service entirely. This could be achieved by developing a custom Envoy filter using WebAssembly (WASM). This filter could terminate the GraphQL WebSocket connection, interact directly with the NATS cluster, and stream responses back to the client from within the Envoy data plane itself. This would reduce network hops, simplify the deployment architecture, and likely yield substantial performance improvements, though it represents a significant increase in development complexity.


  TOC