A single-node WebRTC Selective Forwarding Unit (SFU) is a straightforward engineering task, but it presents a hard ceiling on scalability and is an unacceptable single point of failure in any serious production environment. The moment traffic exceeds the capacity of one machine, the service degrades for everyone. The technical pain point isn’t building an SFU; it’s building a resilient SFU service that can scale horizontally, handle node failures gracefully, and securely authenticate clients into partitioned sessions. This immediately transforms the problem from one of media stream handling to one of distributed systems coordination.
Our initial concept was to build a cluster of stateless SFU nodes. A client’s journey should be: authenticate once, get routed to the most appropriate SFU node, and establish a media session. If that node fails, the client should be able to reconnect and be seamlessly routed to a healthy node. This architecture requires a robust mechanism for service discovery, real-time health checking, and a secure, stateless method for authorizing a client’s access to a specific media session on any node in the cluster.
The technology selection process was driven by pragmatism for a high-concurrency, real-time environment.
- Go: Its concurrency model, with goroutines and channels, is exceptionally well-suited for handling thousands of concurrent network connections (WebSockets for signaling, UDP for media) without the complexity of thread management or the overhead of heavier runtimes. The performance is more than adequate, and the ecosystem for networking and distributed systems is mature.
- WebRTC SFU Architecture: We chose an SFU over a mesh or MCU. A mesh architecture doesn’t scale beyond a handful of peers as connection counts grow quadratically. An MCU (Multipoint Conferencing Unit) decodes and re-encodes streams, offering features like composite video, but at a significant CPU cost, making horizontal scaling expensive. An SFU simply forwards received media packets to other subscribers in the same session, striking a balance between server load and client-side complexity, making it ideal for scalable broadcasting and large group calls. We leveraged the Pion library, a comprehensive WebRTC implementation in Go.
- etcd for Service Discovery: For coordinating the cluster, we needed a consistent and reliable key-value store. While options like Consul or Zookeeper exist,
etcd
was a natural fit. Its foundation on the Raft consensus algorithm guarantees strong consistency. More importantly, its native support for leases and watch streams is precisely what’s required for our use case. A node can register its availability with a lease; if the node crashes, the lease expires, and the key is automatically removed. Other services can watch for changes to the node list in real-time. This is far more robust than simple heartbeating. - GitHub OAuth2 and JWT for Auth: Authentication and authorization must be decoupled from the media nodes. Using an external OAuth provider like GitHub allows us to offload identity management. Our service validates this identity and then mints a JSON Web Token (JWT). This token is stateless. It contains all the necessary claims (e.g.,
user_id
,room_id
,exp
) and is signed by our authentication service. Any SFU node in the cluster can independently verify this token’s signature without calling back to a central service, making the authorization check fast and horizontally scalable.
Architectural Overview
The system is composed of three primary components:
- Auth & Signaling Service: A Go HTTP service that handles the GitHub OAuth2 callback, issues JWTs, and acts as the initial signaling endpoint. It queries
etcd
to find the optimal SFU node for a client’s requested session. - SFU Nodes: A cluster of identical Go applications. Each node runs a WebRTC SFU. Upon startup, it registers itself in
etcd
with a lease and reports its current load. - etcd Cluster: The source of truth for SFU node availability and load information.
The client interaction flow is as follows:
sequenceDiagram participant C as Client participant A as Auth & Signaling Service participant G as GitHub participant E as etcd participant S as SFU Node C->>+A: Login request A->>+G: Redirect to GitHub OAuth G-->>-C: User authenticates C->>+G: Authorization code G->>+A: Send auth code A->>G: Exchange code for access token G-->>A: Access token A-->>C: Set session, client now authenticated C->>A: Request to join Room X A->>+E: Watch /sfu/nodes/ for healthy nodes E-->>-A: List of active SFU nodes and their loads A->>A: Select least-loaded SFU Node (e.g., sfu-node-2) A->>A: Generate JWT with claims (user_id, room_id=X) A-->>-C: Respond with { sfu_address: '...', jwt: '...' } C->>+S: Connect (WebSocket) to sfu-node-2 C->>S: Send Auth message with JWT S->>S: Verify JWT signature and claims S-->>-C: Auth success, begin WebRTC negotiation C-->>S: Exchange SDP and ICE Candidates S-->>C: Exchange SDP and ICE Candidates C-->>S: PeerConnection Established (Media Flows)
Implementation: The SFU Node and etcd Registration
First, let’s define the structure for the SFU node’s metadata that will be stored in etcd
.
// internal/discovery/node.go
package discovery
import (
"encoding/json"
"fmt"
"time"
)
// NodeMeta holds the metadata for an SFU node.
// This struct will be JSON-encoded and stored as the value of the etcd key.
type NodeMeta struct {
NodeID string `json:"node_id"`
RPCAddress string `json:"rpc_address"` // Address for client signaling
LastUpdate time.Time `json:"last_update"`
Load int `json:"load"` // e.g., number of active streams or peers
}
func (m NodeMeta) ToJSON() (string, error) {
b, err := json.Marshal(m)
if err != nil {
return "", fmt.Errorf("failed to marshal node meta: %w", err)
}
return string(b), nil
}
func FromJSON(data []byte) (NodeMeta, error) {
var meta NodeMeta
if err := json.Unmarshal(data, &meta); err != nil {
return NodeMeta{}, fmt.Errorf("failed to unmarshal node meta: %w", err)
}
return meta, nil
}
The core of the discovery mechanism is the Registry
component within each SFU node. It’s responsible for creating a lease with etcd
and periodically updating its key with the latest node metadata.
// internal/discovery/registry.go
package discovery
import (
"context"
"fmt"
"log/slog"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
const (
// ServicePrefix is the key prefix in etcd for all SFU nodes.
ServicePrefix = "/sfu/nodes/"
LeaseTTLSeconds = 15
UpdateInterval = 10 * time.Second
)
// Registry handles the registration of a single SFU node in etcd.
type Registry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
meta NodeMeta
key string
stopChan chan struct{}
logger *slog.Logger
}
// NewRegistry creates a new service registry.
func NewRegistry(client *clientv3.Client, meta NodeMeta, logger *slog.Logger) *Registry {
return &Registry{
client: client,
meta: meta,
key: ServicePrefix + meta.NodeID,
stopChan: make(chan struct{}),
logger: logger,
}
}
// RegisterAndKeepAlive starts the registration process and keeps the lease alive.
// This should be run in a goroutine.
func (r *Registry) RegisterAndKeepAlive() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 1. Grant a new lease with a specific TTL.
leaseResp, err := r.client.Grant(ctx, LeaseTTLSeconds)
if err != nil {
return fmt.Errorf("failed to grant etcd lease: %w", err)
}
r.leaseID = leaseResp.ID
r.logger.Info("etcd lease granted", "lease_id", r.leaseID)
// 2. Associate the key with the lease and put the initial metadata.
if err := r.updateNodeMeta(); err != nil {
return fmt.Errorf("initial registration failed: %w", err)
}
// 3. Start a keep-alive stream for the lease.
// etcd client handles sending heartbeats automatically.
keepAliveChan, err := r.client.KeepAlive(context.Background(), r.leaseID)
if err != nil {
// If keep-alive fails, we must revoke the lease to clean up.
r.revokeLease()
return fmt.Errorf("failed to start etcd keep-alive: %w", err)
}
ticker := time.NewTicker(UpdateInterval)
defer ticker.Stop()
r.logger.Info("SFU node registered in etcd", "key", r.key)
// Main loop for updating metadata and handling lease status.
for {
select {
case <-r.stopChan:
r.logger.Info("stopping registry and revoking lease")
r.revokeLease()
return nil
case ka := <-keepAliveChan:
if ka == nil {
// This indicates the lease has expired or been revoked.
// A robust implementation would attempt to re-register.
r.logger.Error("etcd keep-alive channel closed, lease may be lost")
return fmt.Errorf("etcd lease lost")
}
// Optional: log keep-alive responses for debugging.
// r.logger.Debug("etcd lease keep-alive ack received")
case <-ticker.C:
// Periodically update the node's metadata (e.g., load).
// In a real SFU, you would fetch this from the media engine.
r.meta.Load++ // Placeholder for actual load metric.
if err := r.updateNodeMeta(); err != nil {
r.logger.Error("failed to update node meta in etcd", "error", err)
}
}
}
}
// updateNodeMeta pushes the current node metadata to etcd.
func (r *Registry) updateNodeMeta() error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
r.meta.LastUpdate = time.Now().UTC()
val, err := r.meta.ToJSON()
if err != nil {
return err
}
// Put the key with the lease option. If the lease expires, the key is deleted.
_, err = r.client.Put(ctx, r.key, val, clientv3.WithLease(r.leaseID))
return err
}
// Stop gracefully stops the registry.
func (r *Registry) Stop() {
close(r.stopChan)
}
// revokeLease explicitly revokes the etcd lease.
func (r *Registry) revokeLease() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if _, err := r.client.Revoke(ctx, r.leaseID); err != nil {
r.logger.Error("failed to revoke etcd lease", "lease_id", r.leaseID, "error", err)
}
}
A common pitfall here is failing to handle the keepAliveChan
returning nil
. This signals that the lease is gone. Production code must have a re-registration loop to handle transient network issues with the etcd
cluster. The use of a lease is critical; it ensures that crashed nodes are automatically deregistered, preventing the signaler from routing clients to dead endpoints.
Implementation: Auth & Signaling Service
This service is the client’s entry point. It has two main responsibilities: handling authentication and routing clients.
1. Authentication with GitHub and JWT Minting
We use the standard golang.org/x/oauth2
package for the GitHub flow. The critical part is what happens after a successful authentication: we create a JWT containing claims that will be consumed by the SFU.
// internal/auth/jwt.go
package auth
import (
"fmt"
"time"
"github.com/golang-jwt/jwt/v5"
)
// Claims defines the structure of our JWT claims.
type Claims struct {
UserID string `json:"user_id"`
UserName string `json:"user_name"`
RoomID string `json:"room_id"`
jwt.RegisteredClaims
}
// JWTManager handles creating and validating JWTs.
type JWTManager struct {
secretKey []byte
tokenDuration time.Duration
}
func NewJWTManager(secretKey string, tokenDuration time.Duration) *JWTManager {
return &JWTManager{
secretKey: []byte(secretKey),
tokenDuration: tokenDuration,
}
}
// Generate creates a new JWT for a given user and room.
func (m *JWTManager) Generate(userID, userName, roomID string) (string, error) {
claims := Claims{
UserID: userID,
UserName: userName,
RoomID: roomID,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(m.tokenDuration)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(m.secretKey)
}
// Verify checks the validity of a token string and returns the claims.
func (m *JWTManager) Verify(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(
tokenString,
&Claims{},
func(token *jwt.Token) (interface{}, error) {
// Check the signing algorithm
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return m.secretKey, nil
},
)
if err != nil {
return nil, fmt.Errorf("failed to parse token: %w", err)
}
claims, ok := token.Claims.(*Claims)
if !ok || !token.Valid {
return nil, fmt.Errorf("invalid token or claims")
}
return claims, nil
}
The HTTP handler for joining a room would look something like this:
// cmd/signaler/main.go (simplified handler)
// ... (setup for JWTManager, etcd client, etc.)
type JoinRoomResponse struct {
SFUAddress string `json:"sfu_address"`
JWT string `json:"jwt"`
}
func (s *Server) handleJoinRoom(w http.ResponseWriter, r *http.Request) {
// 1. Assume user is already authenticated via session/cookie from OAuth flow.
// In a real app, you'd fetch user info (userID, userName) from the session.
userID := "user-from-session-123"
userName := "testuser"
roomID := r.URL.Query().Get("room")
if roomID == "" {
http.Error(w, "room parameter is required", http.StatusBadRequest)
return
}
// 2. Discover the best SFU node.
node, err := s.discovery.FindBestNode() // This method implements the load balancing logic
if err != nil {
s.logger.Error("failed to find SFU node", "error", err)
http.Error(w, "no available SFU nodes", http.StatusInternalServerError)
return
}
// 3. Generate a JWT specific to this user and room.
token, err := s.jwtManager.Generate(userID, userName, roomID)
if err != nil {
s.logger.Error("failed to generate JWT", "error", err)
http.Error(w, "could not generate session token", http.StatusInternalServerError)
return
}
// 4. Respond to the client.
resp := JoinRoomResponse{
SFUAddress: node.RPCAddress,
JWT: token,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
2. Node Discovery and Load Balancing
The FindBestNode
method in the signaling service is responsible for querying etcd
. It shouldn’t just get a list of nodes once; for resilience, it should actively watch for changes.
// internal/discovery/watcher.go
package discovery
import (
"context"
"log/slog"
"sync"
clientv3 "go.etcd.io/etcd/client/v3"
)
// Watcher monitors etcd for changes in SFU node availability.
type Watcher struct {
client *clientv3.Client
logger *slog.Logger
nodes map[string]NodeMeta
mu sync.RWMutex
}
func NewWatcher(client *clientv3.Client, logger *slog.Logger) *Watcher {
return &Watcher{
client: client,
logger: logger,
nodes: make(map[string]NodeMeta),
}
}
// Start watching for node changes in a separate goroutine.
func (w *Watcher) Start(ctx context.Context) error {
// Initial fetch of all nodes
if err := w.syncNodes(ctx); err != nil {
return err
}
go w.watchLoop(ctx)
return nil
}
func (w *Watcher) watchLoop(ctx context.Context) {
watchChan := w.client.Watch(ctx, ServicePrefix, clientv3.WithPrefix())
w.logger.Info("started watching etcd for SFU node changes", "prefix", ServicePrefix)
for {
select {
case <-ctx.Done():
w.logger.Info("stopping etcd watcher")
return
case resp := <-watchChan:
w.handleWatchResponse(resp)
}
}
}
func (w *Watcher) handleWatchResponse(resp clientv3.WatchResponse) {
w.mu.Lock()
defer w.mu.Unlock()
if err := resp.Err(); err != nil {
w.logger.Error("etcd watch error", "error", err)
return
}
for _, event := range resp.Events {
key := string(event.Kv.Key)
switch event.Type {
case clientv3.EventTypePut:
meta, err := FromJSON(event.Kv.Value)
if err != nil {
w.logger.Error("failed to unmarshal node meta from watch event", "key", key, "error", err)
continue
}
w.nodes[key] = meta
w.logger.Info("SFU node updated/added", "node_id", meta.NodeID, "address", meta.RPCAddress)
case clientv3.EventTypeDelete:
delete(w.nodes, key)
w.logger.Warn("SFU node removed", "key", key)
}
}
}
func (w *Watcher) syncNodes(ctx context.Context) error {
resp, err := w.client.Get(ctx, ServicePrefix, clientv3.WithPrefix())
if err != nil {
return err
}
w.mu.Lock()
defer w.mu.Unlock()
w.nodes = make(map[string]NodeMeta)
for _, kv := range resp.Kvs {
meta, err := FromJSON(kv.Value)
if err != nil {
w.logger.Error("failed to unmarshal node meta during sync", "key", string(kv.Key), "error", err)
continue
}
w.nodes[string(kv.Key)] = meta
}
w.logger.Info("successfully synchronized SFU nodes from etcd", "count", len(w.nodes))
return nil
}
// FindBestNode implements a simple load balancing strategy: return the node with the lowest load.
func (w *Watcher) FindBestNode() (NodeMeta, error) {
w.mu.RLock()
defer w.mu.RUnlock()
if len(w.nodes) == 0 {
return NodeMeta{}, fmt.Errorf("no available SFU nodes")
}
var bestNode NodeMeta
minLoad := -1
for _, node := range w.nodes {
if minLoad == -1 || node.Load < minLoad {
minLoad = node.Load
bestNode = node
}
}
return bestNode, nil
}
This watcher maintains an in-memory cache of available nodes, which is far more efficient than querying etcd
on every single client request. The logic in FindBestNode
is a trivial “least connections” strategy, but it can be extended to consider geographic location, resource utilization, or session affinity.
Bringing It Together: The Client-SFU Handshake
Once the client receives the SFU address and the JWT, it initiates a WebSocket connection to the assigned SFU node. The very first message on this connection must be for authorization.
// client-side pseudo-code
async function joinRoom(roomId) {
// 1. Fetch routing info from our auth/signaling server
const response = await fetch(`/api/join?room=${roomId}`);
const { sfu_address, jwt } = await response.json();
// 2. Connect to the assigned SFU node
const ws = new WebSocket(sfu_address);
const peerConnection = new RTCPeerConnection();
ws.onopen = () => {
// 3. First message MUST be the auth token
const authMessage = {
type: 'auth',
payload: { jwt: jwt }
};
ws.send(JSON.stringify(authMessage));
};
ws.onmessage = async (event) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'auth_success':
// Now we can start the WebRTC SDP exchange
console.log('Authenticated with SFU, starting WebRTC handshake...');
// ... create offer, send it, handle answer, etc.
break;
case 'auth_failure':
console.error('SFU authentication failed:', message.payload.error);
ws.close();
break;
// ... handle sdp_offer, sdp_answer, ice_candidate messages
}
};
// ... setup peerConnection event handlers
}
On the SFU node, the WebSocket handler must enforce this flow. No WebRTC signaling should occur before successful JWT validation.
// cmd/sfu/main.go (simplified websocket handler)
func (s *SFUServer) handleWebSocket(conn *websocket.Conn) {
// A simple state machine for the connection
type ConnState int
const (
StateWaitingForAuth ConnState = iota
StateAuthenticated
)
state := StateWaitingForAuth
var claims *auth.Claims
for {
_, msg, err := conn.ReadMessage()
if err != nil {
// ... handle connection close
return
}
// Simplified message parsing
var message map[string]interface{}
json.Unmarshal(msg, &message)
switch state {
case StateWaitingForAuth:
if message["type"] != "auth" {
// Protocol violation: close connection
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_failure", "payload": {"error":"First message must be auth"}}`))
conn.Close()
return
}
tokenStr := message["payload"].(map[string]interface{})["jwt"].(string)
claims, err = s.jwtManager.Verify(tokenStr)
if err != nil {
// Invalid JWT
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_failure", "payload": {"error":"Invalid token"}}`))
conn.Close()
return
}
// Authorization successful
state = StateAuthenticated
conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"auth_success"}`))
s.logger.Info("client authenticated for room", "room_id", claims.RoomID, "user_id", claims.UserID)
// Now, create the PeerConnection and associate it with this WebSocket.
// ...
case StateAuthenticated:
// Handle WebRTC signaling messages (SDP, ICE)
// Use claims.RoomID to route media to the correct session
// ...
}
}
}
Testing Strategy
In a distributed system like this, unit tests are necessary but not sufficient.
- Unit Tests: The
JWTManager
is a perfect candidate for unit testing. One can test token generation and verification, including edge cases like expired tokens or tokens signed with the wrong key, without any external dependencies. Theetcd
registry logic can be unit-tested by mocking theetcd/clientv3
interface. - Integration Tests: A more valuable test involves spinning up a lightweight
etcd
instance (e.g., in a Docker container), a mock SFU node that registers itself, and a test client that queries the signaling service. This validates the entire discovery and routing flow. - End-to-End Tests: Using a headless browser framework, one can automate a client joining a room, publishing a media stream, and a second client verifying it receives the stream. Running this against a multi-node SFU cluster under load is the ultimate confidence check.
The current implementation provides a solid, scalable foundation. However, several aspects require further consideration in a full-scale production deployment. The load-balancing strategy is naive; a more sophisticated approach would incorporate CPU and network I/O metrics from each SFU node, not just a stream count. Session persistence is another challenge; if a client disconnects momentarily, it would be ideal to route them back to the same SFU node where their session state might still exist. This requires a distributed session mapping, which could also be managed in etcd
or a faster cache like Redis. Finally, for a global deployment, this architecture would need to be extended with geo-DNS routing to direct users to the nearest regional cluster, and cross-region coordination mechanisms would be required if users from different regions need to join the same session.