Architecting a Real-Time Audio Feature Extraction Service with WebRTC SciPy and Cassandra


The core technical challenge was to build a server-authoritative analysis pipeline for thousands of concurrent WebRTC audio streams. Client-side metrics are unreliable due to network variance and device heterogeneity. We required a system capable of ingesting raw audio server-side, performing complex numerical analysis in near real-time, and persisting the resulting high-volume time-series data for long-term diagnostics and quality of service (QoS) monitoring. A monolith approach was immediately discarded due to scaling and fault-isolation concerns. The architecture demanded a careful composition of specialized components, each containerized for deployment consistency.

The final design couples a custom Python-based Selective Forwarding Unit (SFU) for media interception, a SciPy worker pool for numerical processing, and a Cassandra cluster for its high-throughput write capabilities, all orchestrated via Docker. This is not a “hello world” tutorial; it’s a breakdown of a production-grade implementation, including the necessary configurations, data models, and the rationale behind critical architectural trade-offs.

The Dockerized Service Mesh

In any real-world project involving multiple interacting services, starting with the deployment topology is paramount. It forces clarity on networking, dependencies, and configuration management from day one. We defined our stack using docker-compose, which serves as both our development environment and the blueprint for production container orchestration.

# docker-compose.yml
version: "3.8"

services:
  cassandra:
    image: cassandra:4.0
    container_name: cassandra-node1
    ports:
      - "9042:9042"
    volumes:
      - cassandra_data:/var/lib/cassandra
    environment:
      - CASSANDRA_CLUSTER_NAME=WebRTC_Metrics_Cluster
      - CASSANDRA_DC=dc1
      - CASSANDRA_RACK=rack1
      - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
    healthcheck:
      test: ["CMD", "cqlsh", "-e", "describe keyspaces"]
      interval: 15s
      timeout: 10s
      retries: 10

  signaling:
    build:
      context: ./signaling
    container_name: webrtc-signaling
    ports:
      - "8080:8080"
    command: python /app/server.py

  media_processor:
    build:
      context: ./media_processor
    container_name: media-processor
    depends_on:
      cassandra:
        condition: service_healthy
    command: python /app/server.py
    environment:
      - CASSANDRA_HOST=cassandra
      - CASSANDRA_PORT=9042
      - CASSANDRA_KEYSPACE=webrtc_metrics
    # WebRTC requires a range of UDP ports for media transport (RTP/RTCP)
    # In production, this would be a carefully managed range.
    ports:
      - "50000-50100:50000-50100/udp"

volumes:
  cassandra_data:

This configuration defines three core services:

  1. cassandra: A single-node Cassandra cluster. The healthcheck is a critical detail for a pragmatic setup; it ensures that dependent services only start after the database is fully operational, preventing cascading connection failures on startup.
  2. signaling: A lightweight WebSocket server responsible for the WebRTC session negotiation (SDP offer/answer exchange). Its logic is straightforward and kept separate from the media plane.
  3. media_processor: This is the heart of the system. It runs our custom Python SFU, which receives media, processes it, and connects to Cassandra. The explicit UDP port range is non-negotiable for WebRTC’s media transport protocol (SRTP).

Signaling Plane: The Necessary Handshake

While not the focus of the analysis problem, a stable signaling server is a prerequisite. We implemented a minimal one using Python’s websockets library. Its sole job is to broker messages between peers to establish the RTCPeerConnection.

# signaling/server.py
import asyncio
import json
import logging
import websockets

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

USERS = set()

async def handler(websocket, path):
    """
    Manages WebSocket connections and relays WebRTC signaling messages.
    """
    try:
        USERS.add(websocket)
        logging.info(f"Client connected. Total clients: {len(USERS)}")
        async for message in websocket:
            # The signaling logic here is a simple broadcast.
            # In production, you'd have rooms and targeted messaging.
            data = json.loads(message)
            logging.info(f"Received message: {data['type']}")
            
            other_peers = [user for user in USERS if user != websocket]
            if other_peers:
                await asyncio.wait([user.send(message) for user in other_peers])

    except websockets.exceptions.ConnectionClosedError:
        logging.info("Client connection closed.")
    finally:
        USERS.remove(websocket)
        logging.info(f"Client disconnected. Total clients: {len(USERS)}")

async def main():
    async with websockets.serve(handler, "0.0.0.0", 8080):
        logging.info("Signaling server started on ws://0.0.0.0:8080")
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

The key here is simplicity. The signaling server is stateless and merely acts as a message bus. This design choice prevents it from becoming a bottleneck and keeps the state management concerns within the media processing service where they belong.

Media Plane: Interception and Processing with aiortc

The most significant architectural decision was to build the media interception logic in Python using the aiortc library. While production systems might leverage compiled SFUs like Janus or mediasoup for raw performance, aiortc provides direct access to raw media frames within the Python ecosystem. This avoids complex inter-process communication or plugin development, making it ideal for a system where the primary value lies in the Python-based analysis itself.

The media_processor server establishes a peer connection with the client, but instead of forwarding media, it diverts the incoming audio track to a processing pipeline.

# media_processor/server.py
import asyncio
import json
import logging
import uuid
import os
from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack
from aiortc.contrib.media import MediaRecorder, MediaBlackhole
import numpy as np
from scipy import signal
from scipy.fft import rfft, rfftfreq
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement

# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
CASSANDRA_HOST = os.environ.get("CASSANDRA_HOST", "127.0.0.1")
CASSANDRA_PORT = int(os.environ.get("CASSANDRA_PORT", 9042))
KEYSPACE = os.environ.get("CASSANDRA_KEYSPACE", "webrtc_metrics")

# --- Cassandra Setup ---
def get_cassandra_session():
    """Establishes connection to Cassandra and ensures keyspace/table exist."""
    try:
        cluster = Cluster([CASSANDRA_HOST], port=CASSANDRA_PORT)
        session = cluster.connect()
        session.execute(f"""
            CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}
            WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}
        """)
        session.set_keyspace(KEYSPACE)
        session.execute("""
            CREATE TABLE IF NOT EXISTS audio_features (
                session_id uuid,
                event_time timestamp,
                rms_volume float,
                dominant_frequency float,
                spectral_flatness float,
                PRIMARY KEY (session_id, event_time)
            ) WITH CLUSTERING ORDER BY (event_time DESC);
        """)
        logging.info("Cassandra session established and schema verified.")
        return session
    except Exception as e:
        logging.error(f"Failed to connect to Cassandra: {e}")
        raise

CASSANDRA_SESSION = get_cassandra_session()
INSERT_STATEMENT = CASSANDRA_SESSION.prepare(
    "INSERT INTO audio_features (session_id, event_time, rms_volume, dominant_frequency, spectral_flatness) VALUES (?, now(), ?, ?, ?)"
)

# --- Analysis Logic ---
class AudioAnalysisTrack(MediaStreamTrack):
    """
    A custom MediaStreamTrack that receives audio frames, analyzes them,
    and forwards them to a data sink (Cassandra).
    """
    kind = "audio"

    def __init__(self, track, session_id):
        super().__init__()
        self.track = track
        self.session_id = session_id
        self.buffer = np.array([], dtype=np.int16)
        # Process audio in 1-second chunks (48000 samples for 48kHz sample rate)
        self.CHUNK_SIZE = 48000 
        self.SAMPLE_RATE = 48000

    async def recv(self):
        frame = await self.track.recv()
        
        # Audio frames from aiortc are AudioFrame objects.
        # We resample to a consistent format and convert to a NumPy array.
        audio_data = frame.to_ndarray(format="s16", layout="mono").flatten()
        self.buffer = np.concatenate((self.buffer, audio_data))

        if len(self.buffer) >= self.CHUNK_SIZE:
            chunk = self.buffer[:self.CHUNK_SIZE]
            self.buffer = self.buffer[self.CHUNK_SIZE:]
            
            # Offload CPU-bound analysis to a separate thread to avoid blocking the event loop
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, self.process_chunk, chunk)

        # We must return the frame to satisfy the MediaStreamTrack interface,
        # but it goes nowhere. This is effectively a media "sink".
        return frame

    def process_chunk(self, chunk):
        """
        Performs SciPy-based analysis. This runs in a thread pool executor.
        A common pitfall is running heavy computation directly in the async recv method,
        which would block the entire server's event loop.
        """
        try:
            # 1. RMS Volume
            rms = np.sqrt(np.mean(np.square(chunk.astype(np.float64))))

            # 2. Dominant Frequency via FFT
            N = len(chunk)
            yf = rfft(chunk)
            xf = rfftfreq(N, 1 / self.SAMPLE_RATE)
            dominant_freq = xf[np.argmax(np.abs(yf))]
            
            # 3. Spectral Flatness
            ps = np.abs(yf)**2
            gmean = np.exp(np.mean(np.log(ps + 1e-9))) # add epsilon to avoid log(0)
            amean = np.mean(ps)
            spectral_flatness = gmean / amean if amean > 0 else 0.0

            logging.info(
                f"Session {self.session_id}: RMS={rms:.2f}, Freq={dominant_freq:.2f}Hz, Flatness={spectral_flatness:.4f}"
            )
            
            # Write to Cassandra
            CASSANDRA_SESSION.execute(
                INSERT_STATEMENT, (self.session_id, rms, dominant_freq, spectral_flatness)
            )
        except Exception as e:
            logging.error(f"Error in audio processing for session {self.session_id}: {e}")


# --- WebRTC Server Logic ---
pcs = set()

async def offer(request):
    """Handles the WebRTC offer from the client."""
    params = await request.json()
    offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

    pc = RTCPeerConnection()
    session_id = uuid.uuid4()
    pcs.add(pc)

    @pc.on("iceconnectionstatechange")
    async def on_iceconnectionstatechange():
        logging.info(f"ICE connection state is {pc.iceConnectionState}")
        if pc.iceConnectionState == "failed":
            await pc.close()
            pcs.discard(pc)

    @pc.on("track")
    async def on_track(track):
        logging.info(f"Track {track.kind} received for session {session_id}")
        if track.kind == "audio":
            analysis_track = AudioAnalysisTrack(track, session_id)
            # This is the key diversion: we don't add the track back to the peer connection.
            # We simply consume it with our analysis class.
        else:
            track.stop()

    await pc.setRemoteDescription(offer)
    answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)

    return {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}


# Basic web server boilerplate omitted for brevity.
# It would wrap the 'offer' function in an HTTP endpoint.
# e.g., using aiohttp
from aiohttp import web

async def handle_offer(request):
    response_data = await offer(request)
    return web.json_response(response_data)

app = web.Application()
app.router.add_post("/offer", handle_offer)

if __name__ == "__main__":
    web.run_app(app, host="0.0.0.0", port=8081)

The most critical part of this code is the AudioAnalysisTrack class. It inherits from MediaStreamTrack and acts as a sink. Inside its recv method, it receives raw AudioFrame objects, converts them into NumPy arrays, and buffers them. Once a sufficient chunk of audio is collected (e.g., one second), it hands off the CPU-intensive SciPy analysis to a thread pool executor (loop.run_in_executor). This is a fundamental pattern for mixing CPU-bound work with async I/O; failing to do so would cause the event loop to stall, catastrophically degrading the performance of all concurrent connections.

Data Engineering: Time-Series Modeling in Cassandra

The choice of Cassandra was deliberate. We anticipate a high volume of write operations with a predictable query pattern: retrieve all metrics for a specific session within a time range. This is a classic time-series use case where Cassandra excels.

The data model reflects this:

CREATE TABLE IF NOT EXISTS audio_features (
    session_id uuid,
    event_time timestamp,
    rms_volume float,
    dominant_frequency float,
    spectral_flatness float,
    PRIMARY KEY (session_id, event_time)
) WITH CLUSTERING ORDER BY (event_time DESC);
  • Partition Key (session_id): All data for a single WebRTC session is co-located on the same node (or replica set). This makes queries for a specific session extremely fast as it involves a single partition read. It also ensures that writes are distributed across the cluster as long as there are many active sessions.
  • Clustering Key (event_time): Within each partition, rows are physically sorted on disk by timestamp. The DESC order is an optimization for the most common query: “get the latest metrics.” This allows Cassandra to read sequentially from the start of the partition without needing to jump around.

This schema is designed for write performance and the primary read query. A common mistake is to try to design a Cassandra schema that serves all possible queries. This leads to inefficient models. For ad-hoc analytics (e.g., “what is the average RMS volume across all sessions in the last hour?”), a secondary batch processing pipeline using Spark reading from this Cassandra table would be the correct approach.

Client-Side Integration

The client-side JavaScript is relatively standard. Its only responsibility is to capture microphone audio and establish a peer connection with our server. The client is completely unaware of the complex analysis happening on the backend.

// A simplified client-side implementation
const pc = new RTCPeerConnection({
  iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
});

const signaling = new WebSocket('ws://localhost:8080');

signaling.onmessage = async (e) => {
  const message = JSON.parse(e.data);
  if (message.offer) {
    await pc.setRemoteDescription(new RTCSessionDescription(message.offer));
    const answer = await pc.createAnswer();
    await pc.setLocalDescription(answer);
    signaling.send(JSON.stringify({ 'answer': pc.localDescription }));
  } else if (message.answer) {
    // This part is for peer-to-peer, but we connect to a server.
    // The server will provide the answer.
    await pc.setRemoteDescription(new RTCSessionDescription(message));
  } else if (message.candidate) {
    await pc.addIceCandidate(new RTCIceCandidate(message.candidate));
  }
};

pc.onicecandidate = (event) => {
  if (event.candidate) {
    // In a real app, this would be sent to the signaling server
  }
};

async function start() {
  try {
    const stream = await navigator.mediaDevices.getUserMedia({ audio: true, video: false });
    stream.getTracks().forEach(track => pc.addTrack(track, stream));

    const offer = await pc.createOffer();
    await pc.setLocalDescription(offer);

    // Send the offer to the media_processor via a simple HTTP fetch
    const response = await fetch('http://localhost:8081/offer', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        sdp: offer.sdp,
        type: offer.type,
      }),
    });
    
    const serverAnswer = await response.json();
    await pc.setRemoteDescription(new RTCSessionDescription(serverAnswer));
    console.log("Peer connection established with server.");

  } catch (err) {
    console.error("Error starting WebRTC connection:", err);
  }
}

start();

System Data Flow

The complete flow of data through the architecture can be visualized.

graph TD
    subgraph Client Browser
        A[Microphone] --> B{getUserMedia};
        B --> C[RTCPeerConnection];
    end

    subgraph Docker Network
        subgraph Signaling Service
            D[WebSocket Server]
        end
        subgraph Media Processor Service
            E[aiohttp Endpoint] --> F{aiortc RTCPeerConnection};
            F -- Raw Audio Frames --> G[AudioAnalysisTrack];
            G -- NumPy Array Chunks --> H(Thread Pool Executor);
            H -- Performs SciPy analysis --> I[Feature Metrics];
            I -- Writes --> J[(Cassandra)];
        end
        subgraph Cassandra Service
            J
        end
    end
    
    C -- HTTP Offer --> E;
    E -- HTTP Answer --> C;
    C -- SRTP (UDP) --> F;

This architecture successfully decouples the media ingestion, processing, and storage concerns. Each component can be scaled independently. If analysis becomes the bottleneck, we can increase the number of media_processor containers. If database writes are the bottleneck, we can scale the Cassandra cluster. This is the fundamental advantage of a containerized, service-oriented design over a monolithic application.

The current implementation, while functional, represents a foundational blueprint. The Python-based SFU, while excellent for integrating with the SciPy ecosystem, would become a performance bottleneck under extremely high load compared to a C/C++ based media server; at that point, one would replace the aiortc component with a mediasoup worker and use a transport like gRPC to ship frames to the Python workers. Furthermore, the analysis itself is basic; a production system would incorporate more sophisticated noise reduction, voice activity detection, and machine learning models for deeper insights, potentially requiring a more robust task queue system like Celery instead of a simple thread pool executor. Finally, the Cassandra schema is optimized for per-session queries and would require a complementary analytics datastore, populated via a batch or stream ETL process, to enable complex cross-session aggregations.


  TOC