Engineering a Fault-Tolerant Elixir WebSocket Gateway for Azure Service Bus


Our initial load test was a complete failure. The mandate was to surface real-time operational events from a legacy enterprise system onto a modern web dashboard. The source system communicated exclusively via Azure Service Bus topics, firing thousands of messages per minute during peak load. Our new Elixir application, built with the Phoenix Framework, was designed to consume these messages and push them over WebSockets to connected clients. The theory was sound: Elixir’s concurrency model should handle this with ease.

The reality was a system that fell over under a fraction of the expected load. The BEAM’s scheduler, usually a paragon of resilience, became unresponsive. Memory usage climbed relentlessly until the container was terminated by the orchestrator. Every single client WebSocket connection was dropped. The root cause was a classic impedance mismatch: a high-volume, push-based message source feeding a set of potentially slow, pull-based network clients. Our application was acting as a buffer, but its buffer—the process mailboxes—was finite and overflowing. We weren’t consuming messages; we were drowning in them. This wasn’t a problem that could be solved by adding more memory or CPUs. It was a fundamental architectural flaw that required us to rethink how we managed data flow under pressure.

The Flawed First Approach: A Naive GenServer Consumer

The initial implementation was deceptively simple. We used a GenServer to manage the connection and subscription to the Azure Service Bus topic. A library provided the raw client, and our GenServer would periodically pull a batch of messages and then iterate through them, broadcasting each one to the relevant Phoenix PubSub topic.

# lib/realtime_gateway/bus_consumer.ex
# WARNING: This is the initial, flawed implementation. Do not use in production.

defmodule RealtimeGateway.BusConsumer do
  use GenServer
  require Logger

  alias Azure.ServiceBus.Topic
  alias RealtimeGatewayWeb.Endpoint

  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    # Configuration is pulled from config files
    config = Application.get_env(:realtime_gateway, :azure_service_bus)
    {:ok, topic_client} = Topic.Client.new(config)

    # The subscription name should be durable for a real service
    subscription_name = "dashboard_subscription"
    state = %{
      topic_client: topic_client,
      subscription_name: subscription_name
    }

    # Start polling for messages immediately
    send(self(), :consume_messages)
    {:ok, state}
  end

  @impl true
  def handle_info(:consume_messages, state) do
    Logger.info("Fetching messages from Azure Service Bus...")
    # This is the core of the problem: we pull messages unconditionally.
    case Topic.Subscription.receive_and_delete_messages(state.topic_client, state.subscription_name, max_messages: 100) do
      {:ok, messages} ->
        for message <- messages do
          # Broadcast to all connected clients via Phoenix PubSub
          Endpoint.broadcast!("notifications:all", "new_event", %{data: message.body})
        end

      {:error, reason} ->
        Logger.error("Failed to receive messages: #{inspect(reason)}")
        # Simple backoff before retrying
        Process.sleep(5000)
    end

    # Immediately schedule the next fetch. No back-pressure.
    Process.send_after(self(), :consume_messages, 100) # Poll every 100ms
    {:noreply, state}
  end
end

The problem lies in the handle_info(:consume_messages, state) block. It fetches a batch of up to 100 messages and immediately schedules the next fetch. The Endpoint.broadcast!/3 call is asynchronous. It fires off messages to the Phoenix.PubSub system, which in turn sends messages to the individual Phoenix.Channel processes. If the downstream channel processes or the underlying WebSocket connections are slow, their process mailboxes fill up. Since our BusConsumer GenServer has no awareness of this downstream congestion, it continues to pull messages from Azure Service Bus at full speed, relentlessly flooding the system.

A Path to Resilience: Back-Pressure with GenStage

The solution to this firehose problem is back-pressure. We need a mechanism where the consumers (the WebSocket channel processes) can signal to the producer (the Azure Service Bus client) that they are ready for more data. This is precisely the problem Elixir’s GenStage was designed to solve.

We redesigned the architecture as a multi-stage pipeline:

  1. Producer: A GenStage producer that wraps the Azure Service Bus client. It will only fetch messages from the bus when it receives demand from downstream stages.
  2. Dispatcher: A GenStage producer_consumer that subscribes to the Producer. Its role is to take events and fan them out to multiple consumers. This provides a single point of subscription for all our dynamic WebSocket clients. We will use :broadcast mode for this.
  3. Consumers: A dynamic set of GenStage consumers. Each client connecting via a WebSocket will spawn its own consumer process, which then subscribes to the Dispatcher.

This architecture inverts the flow of control. Data no longer pushes its way through the system; it is pulled by the final consumers.

graph TD
    subgraph Elixir Application
        ASB[Azure Service Bus Topic] -->|pulls when demand exists| P(ASBProducer - GenStage)
        P -->|events| D(BroadcastDispatcher - GenStage)
        D -->|events| C1(WebSocketConsumer 1)
        D -->|events| C2(WebSocketConsumer 2)
        D -->|events| C3(WebSocketConsumer ...)
    end

    subgraph Phoenix Channels
        C1 -->|push| WS1[Client 1]
        C2 -->|push| WS2[Client 2]
        C3 -->|push| WS3[Client ...]
    end

Step 1: Implementing the ASBProducer

The producer is the heart of the back-pressure mechanism. It maintains the Azure Service Bus client state and keeps track of the outstanding demand from its downstream consumer (the dispatcher).

# lib/realtime_gateway/gen_stage/asb_producer.ex
defmodule RealtimeGateway.GenStage.ASBProducer do
  use GenStage
  require Logger

  alias Azure.ServiceBus.Topic

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    config = Application.get_env(:realtime_gateway, :azure_service_bus)
    {:ok, topic_client} = Topic.Client.new(config)

    subscription_name = Keyword.fetch!(opts, :subscription_name)

    state = %{
      topic_client: topic_client,
      subscription_name: subscription_name,
      buffer: [] # To hold messages if we fetch more than demanded
    }

    # This is a producer stage.
    {:producer, state}
  end

  @impl true
  def handle_demand(demand, state) do
    Logger.debug("Received demand for #{demand} events.")

    # Check if we have enough messages in our local buffer first
    if demand >= Enum.count(state.buffer) do
      remaining_demand = demand - Enum.count(state.buffer)
      buffered_messages = state.buffer

      # We need to fetch more messages. Let's fetch a bit more than immediately
      # needed to reduce I/O, but not so much that we buffer forever.
      # A common mistake is to fetch exactly `demand`, which can be inefficient.
      fetch_size = max(remaining_demand, 50) # Fetch at least 50, or the demand
      
      Logger.debug("Fetching up to #{fetch_size} messages from ASB.")
      case Topic.Subscription.receive_and_delete_messages(state.topic_client, state.subscription_name, max_messages: fetch_size) do
        {:ok, new_messages} ->
          all_messages = buffered_messages ++ new_messages
          {to_dispatch, remaining_buffer} = Enum.split(all_messages, demand)
          {:noreply, to_dispatch, %{state | buffer: remaining_buffer}}

        {:error, reason} ->
          Logger.error("Failed to fetch from ASB: #{inspect(reason)}")
          # If fetch fails, we dispatch what we have and keep the demand.
          # The next handle_demand call will trigger another fetch attempt.
          {:noreply, buffered_messages, %{state | buffer: []}}
      end
    else
      # We have enough in the buffer to satisfy the demand
      {to_dispatch, remaining_buffer} = Enum.split(state.buffer, demand)
      {:noreply, to_dispatch, %{state | buffer: remaining_buffer}}
    end
  end
end

The critical logic is in handle_demand/2. It only calls the Azure Service Bus SDK when there’s demand and its internal buffer is insufficient. This prevents the application from pulling messages it cannot process.

Step 2: The BroadcastDispatcher

The dispatcher is a simple but necessary piece of plumbing. It subscribes to the producer and re-broadcasts everything it receives. Using a separate dispatcher decouples the producer from the many dynamic WebSocket consumers. Without it, every consumer would have to subscribe directly to the producer, which would complicate the producer’s logic.

# lib/realtime_gateway/gen_stage/broadcast_dispatcher.ex
defmodule RealtimeGateway.GenStage.BroadcastDispatcher do
  use GenStage

  def start_link(_opts) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    # Subscribe to the producer with a healthy buffer size.
    # The :max_demand config here controls how many events this stage
    # will request from the producer at a time.
    opts = [
      subscribe_to: [{RealtimeGateway.GenStage.ASBProducer, max_demand: 1000, min_demand: 500}]
    ]
    # This stage is both a consumer and a producer (dispatcher).
    # We choose :broadcast for the dispatcher type.
    {:producer_consumer, :ok, dispatcher: GenStage.BroadcastDispatcher, subscribe_to: opts}
  end

  # We just pass the events through. GenStage handles the broadcasting.
  @impl true
  def handle_events(events, _from, state) do
    {:noreply, events, state}
  end
end

Step 3: Integrating with Phoenix Channels

Now we need to wire this pipeline into our Phoenix Channel processes. When a client joins the "notifications:all" channel, we will dynamically start a GenStage consumer that subscribes to our BroadcastDispatcher. This consumer’s entire lifecycle is tied to the channel process.

First, the channel itself:

# lib/realtime_gateway_web/channels/notification_channel.ex
defmodule RealtimeGatewayWeb.NotificationChannel do
  use RealtimeGatewayWeb, :channel
  require Logger

  alias RealtimeGateway.GenStage.WebSocketConsumer

  @impl true
  def join("notifications:all", _payload, socket) do
    Logger.info("Client #{inspect(socket.assigns.user_id)} joined notifications channel.")
    
    # Start a GenStage consumer for this specific channel process.
    # We pass `self()` so the consumer knows which process to send messages to.
    {:ok, consumer_pid} = GenStage.start_link(WebSocketConsumer, channel_pid: self())

    # Crucially, we monitor the consumer process. If it dies, the channel
    # process will receive a :DOWN message and can act accordingly (e.g., terminate).
    Process.monitor(consumer_pid)
    
    new_socket = assign(socket, :consumer_pid, consumer_pid)
    {:ok, "Joined successfully", new_socket}
  end

  # This callback is triggered when the consumer sends us an event.
  @impl true
  def handle_info({:event, payload}, socket) do
    # In a real-world project, you'd deserialize the ASB message body here.
    # For now, we just wrap it.
    push(socket, "new_event", %{data: payload.body})
    {:noreply, socket}
  end

  # Handle the consumer process crashing
  @impl true
  def handle_info({:DOWN, _, :process, pid, reason}, socket) do
    if pid == socket.assigns.consumer_pid do
      Logger.error("WebSocketConsumer process died with reason: #{inspect(reason)}. Terminating channel.")
      {:stop, :normal, socket}
    else
      {:noreply, socket}
    end
  end

  @impl true
  def terminate(reason, socket) do
    Logger.info("Client channel terminating. Reason: #{inspect(reason)}")
    # Ensure the consumer is stopped when the channel terminates.
    if pid = socket.assigns.consumer_pid, do: GenStage.stop(pid)
    :ok
  end
end

And the WebSocketConsumer that the channel starts:

# lib/realtime_gateway/gen_stage/web_socket_consumer.ex
defmodule RealtimeGateway.GenStage.WebSocketConsumer do
  use GenStage
  require Logger

  @impl true
  def init(opts) do
    channel_pid = Keyword.fetch!(opts, :channel_pid)

    # Subscribe to the dispatcher.
    # The max_demand here is critical. It defines how many events this single
    # WebSocket client is willing to buffer. A small number is good for
    # responsiveness and preventing memory bloat per-client.
    subscription_opts = [
      subscribe_to: [{RealtimeGateway.GenStage.BroadcastDispatcher, max_demand: 10, min_demand: 5}]
    ]
    {:consumer, %{channel_pid: channel_pid}, subscription_opts}
  end

  @impl true
  def handle_events(events, _from, state) do
    # Forward the events to the parent channel process for pushing to the client.
    # This is better than pushing directly from here, as it keeps the channel
    # as the single source of truth for WebSocket communication.
    for event <- events do
      send(state.channel_pid, {:event, event})
    end
    
    # We do not change state. We are a stateless forwarder.
    {:noreply, [], state}
  end
end

With this setup, the demand flows backward: The WebSocketConsumer has a max_demand of 10. Once it has processed 5 events (min_demand), it asks the BroadcastDispatcher for more. The BroadcastDispatcher aggregates demand from all its subscribers and, in turn, asks the ASBProducer for more events. The ASBProducer then, and only then, pulls from Azure Service Bus.

Ensuring Robustness: Supervision and Configuration

The GenStage processes must be supervised. We add them to our application’s supervision tree to ensure they are started when the application boots and are restarted if they crash.

# lib/realtime_gateway/application.ex
defmodule RealtimeGateway.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      RealtimeGatewayWeb.Endpoint,
      # Add our GenStage producer and dispatcher to the supervision tree.
      # The producer depends on application config, so we pass it in.
      {RealtimeGateway.GenStage.ASBProducer, subscription_name: "dashboard_subscription"},
      RealtimeGateway.GenStage.BroadcastDispatcher
    ]

    opts = [strategy: :one_for_one, name: RealtimeGateway.Supervisor]
    Supervisor.start_link(children, opts)
  end
  # ...
end

Configuration should be handled cleanly, especially for secrets like connection strings. We use config/runtime.exs for this.

# config/runtime.exs
import Config

config :realtime_gateway, :azure_service_bus,
  connection_string: System.get_env("AZURE_SERVICE_BUS_CONNECTION_STRING"),
  topic_name: "operational-events"

# And in config/config.exs, provide defaults for dev/test
import Config

config :realtime_gateway, :azure_service_bus,
  connection_string: "Endpoint=sb://...",
  topic_name: "dev-events"

Automating Confidence: The CircleCI Pipeline

A system this complex must have an automated test and integration pipeline. We use CircleCI to lint, test, and build our application on every commit.

A common pitfall is not testing the concurrent parts of the system correctly. For GenStage, ExUnit provides the tools we need to write reliable tests.

Here is a test for our ASBProducer. We use a mock for the Azure SDK and :sync mode for GenStage.start_link to make the test deterministic.

# test/realtime_gateway/gen_stage/asb_producer_test.exs
defmodule RealtimeGateway.GenStage.ASBProducerTest do
  use ExUnit.Case, async: false # GenStage tests with names are tricky with async

  alias RealtimeGateway.GenStage.ASBProducer

  # A mock for the Azure SDK client
  defmodule MockASBClient do
    def receive_and_delete_messages(_client, _sub, opts) do
      count = Keyword.get(opts, :max_messages, 10)
      messages = Enum.map(1..count, fn i -> %{body: "event_#{i}"} end)
      {:ok, messages}
    end
  end

  setup do
    # We replace the real client with our mock for the duration of the test.
    # This requires a library like `mox`.
    Mox.stub_with(RealtimeGateway.Mocks.AzureTopic, MockASBClient)
    :ok
  end

  test "produces events when demand is received" do
    # Start the producer in sync mode for testing
    {:ok, producer} = GenStage.start_link(ASBProducer, subscription_name: "test-sub")
    
    # No events should be produced initially
    ref = GenStage.monitor(producer)
    assert_receive {^ref, {:events, []}}

    # Ask for 3 events
    GenStage.demand(producer, 3)
    
    # Assert we receive exactly 3 events
    assert_receive {^ref, {:events, [%{body: "event_1"}, %{body: "event_2"}, %{body: "event_3"}]}}
  end
end

The full .circleci/config.yml provides the structure for continuous integration.

# .circleci/config.yml
version: 2.1

orbs:
  elixir: circleci/[email protected]

jobs:
  build_and_test:
    docker:
      - image: cimg/elixir:1.14-otp-25
    steps:
      - checkout
      - elixir/install_hex_rebar
      - elixir/restore_deps_cache
      - run:
          name: "Install Dependencies"
          command: mix deps.get
      - elixir/save_deps_cache
      - run:
          name: "Run Credo (Linter)"
          command: mix credo --strict
      - run:
          name: "Check Formatting"
          command: mix format --check-formatted
      - run:
          name: "Run Tests"
          command: mix test

workflows:
  main:
    jobs:
      - build_and_test

This configuration ensures that every change is validated against our quality gates, preventing regressions and giving us confidence in our complex, concurrent system.

The re-engineered system is now resilient. During a load spike, messages queue up safely and cost-effectively in Azure Service Bus, not in our application’s memory. As clients process messages and signal demand, the pipeline pulls data through at a sustainable pace. The system degrades gracefully; slow clients only impact their own real-time view, without bringing down the entire gateway for everyone else.

This architecture, while more complex than the initial naive consumer, is not an over-engineering. It’s a necessary response to the reality of building systems that bridge two different communication patterns. The current design is focused on a broadcast scenario, where all connected clients receive the same stream of events. For a more sophisticated system, the BroadcastDispatcher could be replaced with a more intelligent router that manages subscriptions to specific event types or entity IDs, ensuring clients only receive the data they have requested. Furthermore, for extreme scale, the single ASBProducer could become a bottleneck; a future iteration might involve a pool of producers, each handling a different partition of the Service Bus topic, allowing for horizontal scaling of the ingestion layer itself.


  TOC