Constructing a Horizontally Scalable WebRTC SFU in Go with etcd for Node Discovery and JWT-based Session Control


A single-node WebRTC Selective Forwarding Unit (SFU) is a straightforward engineering task, but it presents a hard ceiling on scalability and is an unacceptable single point of failure in any serious production environment. The moment traffic exceeds the capacity of one machine, the service degrades for everyone. The technical pain point isn’t building an SFU; it’s building a resilient SFU service that can scale horizontally, handle node failures gracefully, and securely authenticate clients into partitioned sessions. This immediately transforms the problem from one of media stream handling to one of distributed systems coordination.

Our initial concept was to build a cluster of stateless SFU nodes. A client’s journey should be: authenticate once, get routed to the most appropriate SFU node, and establish a media session. If that node fails, the client should be able to reconnect and be seamlessly routed to a healthy node. This architecture requires a robust mechanism for service discovery, real-time health checking, and a secure, stateless method for authorizing a client’s access to a specific media session on any node in the cluster.

The technology selection process was driven by pragmatism for a high-concurrency, real-time environment.

  • Go: Its concurrency model, with goroutines and channels, is exceptionally well-suited for handling thousands of concurrent network connections (WebSockets for signaling, UDP for media) without the complexity of thread management or the overhead of heavier runtimes. The performance is more than adequate, and the ecosystem for networking and distributed systems is mature.
  • WebRTC SFU Architecture: We chose an SFU over a mesh or MCU. A mesh architecture doesn’t scale beyond a handful of peers as connection counts grow quadratically. An MCU (Multipoint Conferencing Unit) decodes and re-encodes streams, offering features like composite video, but at a significant CPU cost, making horizontal scaling expensive. An SFU simply forwards received media packets to other subscribers in the same session, striking a balance between server load and client-side complexity, making it ideal for scalable broadcasting and large group calls. We leveraged the Pion library, a comprehensive WebRTC implementation in Go.
  • etcd for Service Discovery: For coordinating the cluster, we needed a consistent and reliable key-value store. While options like Consul or Zookeeper exist, etcd was a natural fit. Its foundation on the Raft consensus algorithm guarantees strong consistency. More importantly, its native support for leases and watch streams is precisely what’s required for our use case. A node can register its availability with a lease; if the node crashes, the lease expires, and the key is automatically removed. Other services can watch for changes to the node list in real-time. This is far more robust than simple heartbeating.
  • GitHub OAuth2 and JWT for Auth: Authentication and authorization must be decoupled from the media nodes. Using an external OAuth provider like GitHub allows us to offload identity management. Our service validates this identity and then mints a JSON Web Token (JWT). This token is stateless. It contains all the necessary claims (e.g., user_id, room_id, exp) and is signed by our authentication service. Any SFU node in the cluster can independently verify this token’s signature without calling back to a central service, making the authorization check fast and horizontally scalable.

Architectural Overview

The system is composed of three primary components:

  1. Auth & Signaling Service: A Go HTTP service that handles the GitHub OAuth2 callback, issues JWTs, and acts as the initial signaling endpoint. It queries etcd to find the optimal SFU node for a client’s requested session.
  2. SFU Nodes: A cluster of identical Go applications. Each node runs a WebRTC SFU. Upon startup, it registers itself in etcd with a lease and reports its current load.
  3. etcd Cluster: The source of truth for SFU node availability and load information.

The client interaction flow is as follows:

sequenceDiagram
    participant C as Client
    participant A as Auth & Signaling Service
    participant G as GitHub
    participant E as etcd
    participant S as SFU Node

    C->>+A: Login request
    A->>+G: Redirect to GitHub OAuth
    G-->>-C: User authenticates
    C->>+G: Authorization code
    G->>+A: Send auth code
    A->>G: Exchange code for access token
    G-->>A: Access token
    A-->>C: Set session, client now authenticated

    C->>A: Request to join Room X
    A->>+E: Watch /sfu/nodes/ for healthy nodes
    E-->>-A: List of active SFU nodes and their loads
    A->>A: Select least-loaded SFU Node (e.g., sfu-node-2)
    A->>A: Generate JWT with claims (user_id, room_id=X)
    A-->>-C: Respond with { sfu_address: '...', jwt: '...' }

    C->>+S: Connect (WebSocket) to sfu-node-2
    C->>S: Send Auth message with JWT
    S->>S: Verify JWT signature and claims
    S-->>-C: Auth success, begin WebRTC negotiation
    C-->>S: Exchange SDP and ICE Candidates
    S-->>C: Exchange SDP and ICE Candidates
    C-->>S: PeerConnection Established (Media Flows)

Implementation: The SFU Node and etcd Registration

First, let’s define the structure for the SFU node’s metadata that will be stored in etcd.

// internal/discovery/node.go
package discovery

import (
	"encoding/json"
	"fmt"
	"time"
)

// NodeMeta holds the metadata for an SFU node.
// This struct will be JSON-encoded and stored as the value of the etcd key.
type NodeMeta struct {
	NodeID     string    `json:"node_id"`
	RPCAddress string    `json:"rpc_address"` // Address for client signaling
	LastUpdate time.Time `json:"last_update"`
	Load       int       `json:"load"` // e.g., number of active streams or peers
}

func (m NodeMeta) ToJSON() (string, error) {
	b, err := json.Marshal(m)
	if err != nil {
		return "", fmt.Errorf("failed to marshal node meta: %w", err)
	}
	return string(b), nil
}

func FromJSON(data []byte) (NodeMeta, error) {
	var meta NodeMeta
	if err := json.Unmarshal(data, &meta); err != nil {
		return NodeMeta{}, fmt.Errorf("failed to unmarshal node meta: %w", err)
	}
	return meta, nil
}

The core of the discovery mechanism is the Registry component within each SFU node. It’s responsible for creating a lease with etcd and periodically updating its key with the latest node metadata.

// internal/discovery/registry.go
package discovery

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
)

const (
	// ServicePrefix is the key prefix in etcd for all SFU nodes.
	ServicePrefix   = "/sfu/nodes/"
	LeaseTTLSeconds = 15
	UpdateInterval  = 10 * time.Second
)

// Registry handles the registration of a single SFU node in etcd.
type Registry struct {
	client   *clientv3.Client
	leaseID  clientv3.LeaseID
	meta     NodeMeta
	key      string
	stopChan chan struct{}
	logger   *slog.Logger
}

// NewRegistry creates a new service registry.
func NewRegistry(client *clientv3.Client, meta NodeMeta, logger *slog.Logger) *Registry {
	return &Registry{
		client:   client,
		meta:     meta,
		key:      ServicePrefix + meta.NodeID,
		stopChan: make(chan struct{}),
		logger:   logger,
	}
}

// RegisterAndKeepAlive starts the registration process and keeps the lease alive.
// This should be run in a goroutine.
func (r *Registry) RegisterAndKeepAlive() error {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 1. Grant a new lease with a specific TTL.
	leaseResp, err := r.client.Grant(ctx, LeaseTTLSeconds)
	if err != nil {
		return fmt.Errorf("failed to grant etcd lease: %w", err)
	}
	r.leaseID = leaseResp.ID
	r.logger.Info("etcd lease granted", "lease_id", r.leaseID)

	// 2. Associate the key with the lease and put the initial metadata.
	if err := r.updateNodeMeta(); err != nil {
		return fmt.Errorf("initial registration failed: %w", err)
	}

	// 3. Start a keep-alive stream for the lease.
	// etcd client handles sending heartbeats automatically.
	keepAliveChan, err := r.client.KeepAlive(context.Background(), r.leaseID)
	if err != nil {
		// If keep-alive fails, we must revoke the lease to clean up.
		r.revokeLease()
		return fmt.Errorf("failed to start etcd keep-alive: %w", err)
	}

	ticker := time.NewTicker(UpdateInterval)
	defer ticker.Stop()

	r.logger.Info("SFU node registered in etcd", "key", r.key)

	// Main loop for updating metadata and handling lease status.
	for {
		select {
		case <-r.stopChan:
			r.logger.Info("stopping registry and revoking lease")
			r.revokeLease()
			return nil
		case ka := <-keepAliveChan:
			if ka == nil {
				// This indicates the lease has expired or been revoked.
				// A robust implementation would attempt to re-register.
				r.logger.Error("etcd keep-alive channel closed, lease may be lost")
				return fmt.Errorf("etcd lease lost")
			}
			// Optional: log keep-alive responses for debugging.
			// r.logger.Debug("etcd lease keep-alive ack received")
		case <-ticker.C:
			// Periodically update the node's metadata (e.g., load).
			// In a real SFU, you would fetch this from the media engine.
			r.meta.Load++ // Placeholder for actual load metric.
			if err := r.updateNodeMeta(); err != nil {
				r.logger.Error("failed to update node meta in etcd", "error", err)
			}
		}
	}
}

// updateNodeMeta pushes the current node metadata to etcd.
func (r *Registry) updateNodeMeta() error {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	r.meta.LastUpdate = time.Now().UTC()
	val, err := r.meta.ToJSON()
	if err != nil {
		return err
	}

	// Put the key with the lease option. If the lease expires, the key is deleted.
	_, err = r.client.Put(ctx, r.key, val, clientv3.WithLease(r.leaseID))
	return err
}

// Stop gracefully stops the registry.
func (r *Registry) Stop() {
	close(r.stopChan)
}

// revokeLease explicitly revokes the etcd lease.
func (r *Registry) revokeLease() {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	if _, err := r.client.Revoke(ctx, r.leaseID); err != nil {
		r.logger.Error("failed to revoke etcd lease", "lease_id", r.leaseID, "error", err)
	}
}

A common pitfall here is failing to handle the keepAliveChan returning nil. This signals that the lease is gone. Production code must have a re-registration loop to handle transient network issues with the etcd cluster. The use of a lease is critical; it ensures that crashed nodes are automatically deregistered, preventing the signaler from routing clients to dead endpoints.

Implementation: Auth & Signaling Service

This service is the client’s entry point. It has two main responsibilities: handling authentication and routing clients.

1. Authentication with GitHub and JWT Minting

We use the standard golang.org/x/oauth2 package for the GitHub flow. The critical part is what happens after a successful authentication: we create a JWT containing claims that will be consumed by the SFU.

// internal/auth/jwt.go
package auth

import (
	"fmt"
	"time"

	"github.com/golang-jwt/jwt/v5"
)

// Claims defines the structure of our JWT claims.
type Claims struct {
	UserID   string `json:"user_id"`
	UserName string `json:"user_name"`
	RoomID   string `json:"room_id"`
	jwt.RegisteredClaims
}

// JWTManager handles creating and validating JWTs.
type JWTManager struct {
	secretKey     []byte
	tokenDuration time.Duration
}

func NewJWTManager(secretKey string, tokenDuration time.Duration) *JWTManager {
	return &JWTManager{
		secretKey:     []byte(secretKey),
		tokenDuration: tokenDuration,
	}
}

// Generate creates a new JWT for a given user and room.
func (m *JWTManager) Generate(userID, userName, roomID string) (string, error) {
	claims := Claims{
		UserID:   userID,
		UserName: userName,
		RoomID:   roomID,
		RegisteredClaims: jwt.RegisteredClaims{
			ExpiresAt: jwt.NewNumericDate(time.Now().Add(m.tokenDuration)),
			IssuedAt:  jwt.NewNumericDate(time.Now()),
		},
	}

	token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
	return token.SignedString(m.secretKey)
}

// Verify checks the validity of a token string and returns the claims.
func (m *JWTManager) Verify(tokenString string) (*Claims, error) {
	token, err := jwt.ParseWithClaims(
		tokenString,
		&Claims{},
		func(token *jwt.Token) (interface{}, error) {
			// Check the signing algorithm
			if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
				return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
			}
			return m.secretKey, nil
		},
	)

	if err != nil {
		return nil, fmt.Errorf("failed to parse token: %w", err)
	}

	claims, ok := token.Claims.(*Claims)
	if !ok || !token.Valid {
		return nil, fmt.Errorf("invalid token or claims")
	}

	return claims, nil
}

The HTTP handler for joining a room would look something like this:

// cmd/signaler/main.go (simplified handler)

// ... (setup for JWTManager, etcd client, etc.)

type JoinRoomResponse struct {
	SFUAddress string `json:"sfu_address"`
	JWT        string `json:"jwt"`
}

func (s *Server) handleJoinRoom(w http.ResponseWriter, r *http.Request) {
    // 1. Assume user is already authenticated via session/cookie from OAuth flow.
    // In a real app, you'd fetch user info (userID, userName) from the session.
    userID := "user-from-session-123"
    userName := "testuser"
    
    roomID := r.URL.Query().Get("room")
    if roomID == "" {
        http.Error(w, "room parameter is required", http.StatusBadRequest)
        return
    }

    // 2. Discover the best SFU node.
    node, err := s.discovery.FindBestNode() // This method implements the load balancing logic
    if err != nil {
        s.logger.Error("failed to find SFU node", "error", err)
        http.Error(w, "no available SFU nodes", http.StatusInternalServerError)
        return
    }

    // 3. Generate a JWT specific to this user and room.
    token, err := s.jwtManager.Generate(userID, userName, roomID)
    if err != nil {
        s.logger.Error("failed to generate JWT", "error", err)
        http.Error(w, "could not generate session token", http.StatusInternalServerError)
        return
    }

    // 4. Respond to the client.
    resp := JoinRoomResponse{
        SFUAddress: node.RPCAddress,
        JWT:        token,
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(resp)
}

2. Node Discovery and Load Balancing

The FindBestNode method in the signaling service is responsible for querying etcd. It shouldn’t just get a list of nodes once; for resilience, it should actively watch for changes.

// internal/discovery/watcher.go
package discovery

import (
	"context"
	"log/slog"
	"sync"

	clientv3 "go.etcd.io/etcd/client/v3"
)

// Watcher monitors etcd for changes in SFU node availability.
type Watcher struct {
	client *clientv3.Client
	logger *slog.Logger
	nodes  map[string]NodeMeta
	mu     sync.RWMutex
}

func NewWatcher(client *clientv3.Client, logger *slog.Logger) *Watcher {
	return &Watcher{
		client: client,
		logger: logger,
		nodes:  make(map[string]NodeMeta),
	}
}

// Start watching for node changes in a separate goroutine.
func (w *Watcher) Start(ctx context.Context) error {
	// Initial fetch of all nodes
	if err := w.syncNodes(ctx); err != nil {
		return err
	}

	go w.watchLoop(ctx)
	return nil
}

func (w *Watcher) watchLoop(ctx context.Context) {
	watchChan := w.client.Watch(ctx, ServicePrefix, clientv3.WithPrefix())
	w.logger.Info("started watching etcd for SFU node changes", "prefix", ServicePrefix)
	for {
		select {
		case <-ctx.Done():
			w.logger.Info("stopping etcd watcher")
			return
		case resp := <-watchChan:
			w.handleWatchResponse(resp)
		}
	}
}

func (w *Watcher) handleWatchResponse(resp clientv3.WatchResponse) {
	w.mu.Lock()
	defer w.mu.Unlock()

	if err := resp.Err(); err != nil {
		w.logger.Error("etcd watch error", "error", err)
		return
	}

	for _, event := range resp.Events {
		key := string(event.Kv.Key)
		switch event.Type {
		case clientv3.EventTypePut:
			meta, err := FromJSON(event.Kv.Value)
			if err != nil {
				w.logger.Error("failed to unmarshal node meta from watch event", "key", key, "error", err)
				continue
			}
			w.nodes[key] = meta
			w.logger.Info("SFU node updated/added", "node_id", meta.NodeID, "address", meta.RPCAddress)
		case clientv3.EventTypeDelete:
			delete(w.nodes, key)
			w.logger.Warn("SFU node removed", "key", key)
		}
	}
}

func (w *Watcher) syncNodes(ctx context.Context) error {
	resp, err := w.client.Get(ctx, ServicePrefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	w.mu.Lock()
	defer w.mu.Unlock()

	w.nodes = make(map[string]NodeMeta)
	for _, kv := range resp.Kvs {
		meta, err := FromJSON(kv.Value)
		if err != nil {
			w.logger.Error("failed to unmarshal node meta during sync", "key", string(kv.Key), "error", err)
			continue
		}
		w.nodes[string(kv.Key)] = meta
	}
	w.logger.Info("successfully synchronized SFU nodes from etcd", "count", len(w.nodes))
	return nil
}

// FindBestNode implements a simple load balancing strategy: return the node with the lowest load.
func (w *Watcher) FindBestNode() (NodeMeta, error) {
	w.mu.RLock()
	defer w.mu.RUnlock()

	if len(w.nodes) == 0 {
		return NodeMeta{}, fmt.Errorf("no available SFU nodes")
	}

	var bestNode NodeMeta
	minLoad := -1

	for _, node := range w.nodes {
		if minLoad == -1 || node.Load < minLoad {
			minLoad = node.Load
			bestNode = node
		}
	}
	return bestNode, nil
}

This watcher maintains an in-memory cache of available nodes, which is far more efficient than querying etcd on every single client request. The logic in FindBestNode is a trivial “least connections” strategy, but it can be extended to consider geographic location, resource utilization, or session affinity.

Bringing It Together: The Client-SFU Handshake

Once the client receives the SFU address and the JWT, it initiates a WebSocket connection to the assigned SFU node. The very first message on this connection must be for authorization.

// client-side pseudo-code
async function joinRoom(roomId) {
    // 1. Fetch routing info from our auth/signaling server
    const response = await fetch(`/api/join?room=${roomId}`);
    const { sfu_address, jwt } = await response.json();

    // 2. Connect to the assigned SFU node
    const ws = new WebSocket(sfu_address);
    const peerConnection = new RTCPeerConnection();

    ws.onopen = () => {
        // 3. First message MUST be the auth token
        const authMessage = {
            type: 'auth',
            payload: { jwt: jwt }
        };
        ws.send(JSON.stringify(authMessage));
    };

    ws.onmessage = async (event) => {
        const message = JSON.parse(event.data);
        switch (message.type) {
            case 'auth_success':
                // Now we can start the WebRTC SDP exchange
                console.log('Authenticated with SFU, starting WebRTC handshake...');
                // ... create offer, send it, handle answer, etc.
                break;
            case 'auth_failure':
                console.error('SFU authentication failed:', message.payload.error);
                ws.close();
                break;
            // ... handle sdp_offer, sdp_answer, ice_candidate messages
        }
    };
    
    // ... setup peerConnection event handlers
}

On the SFU node, the WebSocket handler must enforce this flow. No WebRTC signaling should occur before successful JWT validation.

// cmd/sfu/main.go (simplified websocket handler)

func (s *SFUServer) handleWebSocket(conn *websocket.Conn) {
    // A simple state machine for the connection
    type ConnState int
    const (
        StateWaitingForAuth ConnState = iota
        StateAuthenticated
    )
    
    state := StateWaitingForAuth
    var claims *auth.Claims
    
    for {
        _, msg, err := conn.ReadMessage()
        if err != nil {
            // ... handle connection close
            return
        }
        
        // Simplified message parsing
        var message map[string]interface{}
        json.Unmarshal(msg, &message)

        switch state {
        case StateWaitingForAuth:
            if message["type"] != "auth" {
                // Protocol violation: close connection
                conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_failure", "payload": {"error":"First message must be auth"}}`))
                conn.Close()
                return
            }
            
            tokenStr := message["payload"].(map[string]interface{})["jwt"].(string)
            claims, err = s.jwtManager.Verify(tokenStr)
            if err != nil {
                // Invalid JWT
                conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_failure", "payload": {"error":"Invalid token"}}`))
                conn.Close()
                return
            }
            
            // Authorization successful
            state = StateAuthenticated
            conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_success"}`))
            s.logger.Info("client authenticated for room", "room_id", claims.RoomID, "user_id", claims.UserID)
            // Now, create the PeerConnection and associate it with this WebSocket.
            // ...
        
        case StateAuthenticated:
            // Handle WebRTC signaling messages (SDP, ICE)
            // Use claims.RoomID to route media to the correct session
            // ...
        }
    }
}

Testing Strategy

In a distributed system like this, unit tests are necessary but not sufficient.

  • Unit Tests: The JWTManager is a perfect candidate for unit testing. One can test token generation and verification, including edge cases like expired tokens or tokens signed with the wrong key, without any external dependencies. The etcd registry logic can be unit-tested by mocking the etcd/clientv3 interface.
  • Integration Tests: A more valuable test involves spinning up a lightweight etcd instance (e.g., in a Docker container), a mock SFU node that registers itself, and a test client that queries the signaling service. This validates the entire discovery and routing flow.
  • End-to-End Tests: Using a headless browser framework, one can automate a client joining a room, publishing a media stream, and a second client verifying it receives the stream. Running this against a multi-node SFU cluster under load is the ultimate confidence check.

The current implementation provides a solid, scalable foundation. However, several aspects require further consideration in a full-scale production deployment. The load-balancing strategy is naive; a more sophisticated approach would incorporate CPU and network I/O metrics from each SFU node, not just a stream count. Session persistence is another challenge; if a client disconnects momentarily, it would be ideal to route them back to the same SFU node where their session state might still exist. This requires a distributed session mapping, which could also be managed in etcd or a faster cache like Redis. Finally, for a global deployment, this architecture would need to be extended with geo-DNS routing to direct users to the nearest regional cluster, and cross-region coordination mechanisms would be required if users from different regions need to join the same session.


  TOC