Implementing a Polyglot Architecture for Real-Time Multi-Tenant Anomaly Detection


The core challenge is architecting a system capable of ingesting high-frequency time-series data from thousands of devices, distributed across hundreds of isolated tenants, while performing near real-time anomaly detection and pushing notifications to a reactive user interface. Strict data isolation and permission enforcement are non-negotiable. A monolithic architecture, while seemingly simpler, presents significant compromises in scalability, maintainability, and the ability to use the best tool for each discrete problem domain: high-throughput I/O, complex graph-based relationships, numerical computation, and reactive front-end state management.

Architectural Decision Point: Monolith vs. Polyglot Microservices

A monolithic approach, likely centered around Python, appears attractive initially. A single codebase using FastAPI for WebSockets, TensorFlow for modeling, and a traditional relational database with complex JOINs or a document store for data could handle the task.

Monolithic Python Stack - Analysis:

  • Pros:
    • Simplified development loop and dependency management.
    • No network overhead for inter-service communication.
    • Easier transactional integrity within a single database.
  • Cons:
    • I/O Bottleneck: Python’s Global Interpreter Lock (GIL) can become a significant bottleneck when managing tens of thousands of concurrent WebSocket connections, even with async frameworks. The computational workload from TensorFlow inference can starve the I/O event loop, leading to increased latency.
    • Data Modeling Impedance: Modeling multi-tenancy permissions (e.g., Tenant -> Organization -> User -> DeviceGroup -> Device) in a relational or simple document model becomes cumbersome. Queries to verify access rights often require multiple complex joins or application-level logic, which is both slow and error-prone.
    • Deployment Rigidity: The entire application must be scaled as a single unit. Scaling the WebSocket ingestion layer also means scaling the resource-intensive ML inference components, leading to inefficient resource utilization.

Polyglot Service-Oriented Architecture - Analysis:
This alternative decouples the system into specialized services, each built with the most appropriate technology.

  • Proposed Stack:

    1. Ingestion & Real-time Gateway: Ruby (Puma + faye-websocket-ruby) for managing persistent WebSocket connections.
    2. Data & Permission Store: ArangoDB, a multi-model database, leveraging its graph capabilities for tenancy and entity relationships.
    3. Inference Service: A dedicated Python service for running TensorFlow models.
    4. Frontend: A React application using Zustand for lightweight, reactive state management.
    5. Communication Bus: Redis Pub/Sub for asynchronous messaging between the Ruby and Python services.
  • Pros:

    • Optimized I/O: Ruby, particularly with mature servers like Puma, has a robust, multi-threaded concurrency model well-suited for handling a massive number of I/O-bound WebSocket connections without the constraints of a GIL in the same way Python has.
    • Purpose-Built Data Model: ArangoDB’s graph model is a perfect fit. Permissions can be modeled as edges between vertices (_from: 'users/123', _to: 'devices/456', type: 'can_read'). Checking permissions becomes a fast, single graph traversal query.
    • Independent Scaling: The Ruby ingestion service can be scaled based on connection count, while the Python TensorFlow service can be scaled independently based on CPU/GPU demand.
    • Technology Isolation: A critical bug or performance issue in the inference service will not bring down the data ingestion layer.
  • Cons:

    • Operational Complexity: Requires managing, deploying, and monitoring multiple services.
    • Network Latency: Inter-service communication introduces network latency, which must be accounted for.
    • Data Consistency: Eventual consistency becomes the default model between services, requiring careful design to handle potential race conditions or failures.

The final decision is to proceed with the polyglot architecture. For a production system with these requirements, the benefits of specialized components and independent scaling far outweigh the increase in operational complexity. The risk of I/O contention and inefficient scaling in the monolithic approach is too high for a real-time system.

Core Implementation: A Walkthrough of the Architecture

The data flow is as follows: A device connects via WebSocket to the Ruby Gateway. The gateway authenticates the connection, authorizes it against the ArangoDB graph, and begins accepting data points. Each data point is persisted in ArangoDB and a message is published to a Redis channel. The Python Inference Service, subscribed to this channel, picks up the message, retrieves the data from ArangoDB, runs it through a TensorFlow model, and updates the data point in ArangoDB with an anomaly score. ArangoDB’s change data capture (or a secondary Redis message) triggers the Ruby Gateway to push the scored data point back to the subscribed front-end clients for that specific tenant.

1. ArangoDB: The Graph-Based Tenancy Backbone

The schema is defined by collections for entities (vertices) and relationships (edges).

Vertex Collections:

  • tenants: Top-level customer accounts.
  • users: Users belonging to a tenant.
  • devices: IoT devices.
  • readings: Time-series data points from devices.

Edge Collections:

  • has_member: Connects tenants to users.
  • owns_device: Connects tenants to devices.
  • can_access: Connects users to devices (for fine-grained permissions).

Here is the setup script using arangojs for demonstration, but this logic would be in a Ruby migration or setup task.

// arango_setup.js
// This script demonstrates the schema structure. In production, this would be
// managed by application migrations.

const { Database } = require('arangojs');

const db = new Database({
  url: process.env.ARANGO_URL || "http://localhost:8529",
  databaseName: process.env.ARANGO_DB || "iot_platform",
  auth: { username: "root", password: process.env.ARANGO_PASSWORD },
});

async function setupSchema() {
  const vertexCollections = ['tenants', 'users', 'devices', 'readings'];
  const edgeCollections = ['has_member', 'owns_device', 'can_access'];

  for (const name of vertexCollections) {
    const collection = db.collection(name);
    if (!(await collection.exists())) {
      console.log(`Creating vertex collection: ${name}`);
      await collection.create();
    }
  }

  for (const name of edgeCollections) {
    const collection = db.edgeCollection(name);
    if (!(await collection.exists())) {
      console.log(`Creating edge collection: ${name}`);
      await collection.create();
    }
  }

  // Ensure an index on the device_id for fast lookups
  const devices = db.collection('devices');
  await devices.ensureIndex({
    type: 'persistent',
    fields: ['deviceId'],
    unique: true
  });

  // Index readings by timestamp for time-series queries
  const readings = db.collection('readings');
  await readings.ensureIndex({
    type: 'persistent',
    fields: ['deviceId', 'timestamp']
  });

  console.log('Schema setup complete.');
}

setupSchema().catch(err => {
  console.error('Failed to setup ArangoDB schema:', err);
  process.exit(1);
});

A critical piece is the AQL (ArangoDB Query Language) query for authorization. This query checks if a user has permission to publish data for a specific device. It traverses the graph from the user up to the tenant and then down to the device.

-- AQL Query for Authorization
-- This query verifies if a user (identified by user_key) can access a device (identified by deviceId).
-- It checks two paths:
-- 1. Does the user's tenant directly own the device? (Tenant-level access)
-- 2. Does the user have a direct `can_access` edge to the device? (User-level override)
-- `user_key` and `deviceId` are bind parameters.

LET user = DOCUMENT(CONCAT("users/", @user_key))
LET device = FIRST(
  FOR d IN devices
  FILTER d.deviceId == @deviceId
  RETURN d
)

LET tenant_access = (
  FOR v, e, p IN 1..2 INBOUND user has_member
    FILTER p.vertices[1]._id == device.tenant_id
    RETURN 1
)

LET direct_access = (
  FOR v IN 1..1 OUTBOUND user can_access
    FILTER v._id == device._id
    RETURN 1
)

RETURN LENGTH(tenant_access) > 0 OR LENGTH(direct_access) > 0

This single, efficient query replaces multiple SQL joins and complex application logic, demonstrating the power of the graph model for this use case.

2. Ruby WebSocket Gateway

We’ll use Puma as the server and the faye-websocket-ruby library for handling the WebSocket protocol. The application authenticates, authorizes, persists data, and publishes to Redis.

# config.ru
require 'puma'
require 'faye/websocket'
require 'redis'
require 'json'
require 'arango-ruby' # arangorb gem

# --- Database and Redis Configuration ---
# In a real application, this would be in an initializer.
ARANGO_DB = Arango::Database.new(
  url: ENV.fetch('ARANGO_URL', 'http://localhost:8529'),
  database: ENV.fetch('ARANGO_DB', 'iot_platform'),
  username: 'root',
  password: ENV.fetch('ARANGO_PASSWORD')
)
REDIS_CONN = Redis.new(url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1'))
WEBSOCKET_CHANNELS = Concurrent::Map.new

# --- Authorization Logic ---
# A practical implementation of the AQL query from above.
def authorize_device_access(user_key, device_id)
  query = <<-AQL
    LET user = DOCUMENT(CONCAT("users/", @user_key))
    LET device = FIRST(FOR d IN devices FILTER d.deviceId == @deviceId RETURN d)
    
    // Return early if user or device not found
    IF user == null OR device == null THEN RETURN false END
    
    LET tenant_access = LENGTH(
      FOR v IN 1..1 INBOUND user has_member
        FILTER v._id == device.tenant_id
        LIMIT 1
        RETURN 1
    ) > 0
    
    LET direct_access = LENGTH(
      FOR v IN 1..1 OUTBOUND user can_access
        FILTER v._id == device._id
        LIMIT 1
        RETURN 1
    ) > 0
    
    RETURN tenant_access OR direct_access
  AQL

  cursor = ARANGO_DB.query.execute(query, bind_vars: { user_key: user_key, deviceId: device_id })
  cursor.first
end


# --- WebSocket Application Logic ---
App = ->(env) do
  if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil, { ping: 15 })
    
    # Connection state variables
    user_key = nil
    tenant_id = nil
    authorized_devices = Concurrent::Set.new

    ws.on :open do |event|
      # In a real app, use a secure token from the query string or headers.
      # For simplicity, we assume a header `X-Auth-Token` like "user_key:tenant_id".
      token = env['HTTP_X_AUTH_TOKEN']
      unless token && token.include?(':')
        ws.close(4001, 'Authentication token missing or invalid.')
        next
      end
      
      user_key, tenant_id = token.split(':', 2)
      
      # Register this connection for server-to-client pushes
      WEBSOCKET_CHANNELS.compute_if_absent(tenant_id) { Concurrent::Array.new } << ws
      puts "WebSocket connection opened for user #{user_key} in tenant #{tenant_id}"
    end

    ws.on :message do |event|
      begin
        data = JSON.parse(event.data)
        device_id = data['deviceId']
        
        # A common mistake is to trust client input. Always validate.
        unless device_id && data['payload']
          ws.send({ error: 'Invalid message format. Missing deviceId or payload.' }.to_json)
          next
        end

        # Authorize on first message for a device, then cache result.
        unless authorized_devices.include?(device_id)
          is_authorized = authorize_device_access(user_key, device_id)
          if is_authorized
            authorized_devices.add(device_id)
          else
            ws.send({ error: "Unauthorized access to device #{device_id}." }.to_json)
            # Potentially close the connection after repeated failures.
            next
          end
        end

        # Persist to ArangoDB
        readings_collection = ARANGO_DB.collection('readings')
        doc_data = {
          deviceId: device_id,
          tenantId: tenant_id,
          payload: data['payload'],
          timestamp: Time.now.to_i,
          anomalyScore: nil, # To be filled by the inference service
          status: 'pending'
        }
        new_reading = readings_collection.create_document(doc_data, return_new: true)

        # Publish to Redis for the Python service to process
        message_for_worker = { readingId: new_reading.key, deviceId: device_id }.to_json
        REDIS_CONN.publish('inference_queue', message_for_worker)
        
      rescue JSON::ParserError
        ws.send({ error: 'Invalid JSON message.' }.to_json)
      rescue => e
        # Proper logging is critical in production.
        puts "Error processing message: #{e.message}"
        ws.send({ error: 'Internal server error.' }.to_json)
      end
    end

    ws.on :close do |event|
      puts "Connection closed for user #{user_key}. Code: #{event.code}, Reason: #{event.reason}"
      if tenant_id && WEBSOCKET_CHANNELS[tenant_id]
        WEBSOCKET_CHANNELS[tenant_id].delete(ws)
      end
      ws = nil # Help garbage collection
    end

    ws.rack_response
  else
    # Non-WebSocket requests
    [404, { 'Content-Type' => 'text/plain' }, ['Not Found']]
  end
end

# To listen for processed results and push to clients
Thread.new do
  Redis.new(url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1')).subscribe('results_channel') do |on|
    on.message do |channel, msg|
      data = JSON.parse(msg)
      tenant_id = data['tenantId']
      if tenant_id && WEBSOCKET_CHANNELS[tenant_id]
        WEBSOCKET_CHANNELS[tenant_id].each do |ws|
          ws.send(msg) # Forward the full message to the client
        end
      end
    end
  end
end

run App

This Ruby component is highly focused. Its only jobs are managing connections, performing auth checks, and acting as a data forwarder. It does no heavy computation.

3. Python TensorFlow Inference Service

This service is a simple, long-running script. It listens to Redis, performs inference, and updates ArangoDB. For production, this would be managed by a process supervisor like systemd and scaled by running multiple instances.

# inference_worker.py
import os
import json
import redis
import tensorflow as tf
from pyArango.connection import *

# --- Configuration ---
ARANGO_URL = os.environ.get("ARANGO_URL", "http://127.0.0.1:8529")
ARANGO_USER = "root"
ARANGO_PASSWORD = os.environ.get("ARANGO_PASSWORD")
ARANGO_DB_NAME = os.environ.get("ARANGO_DB", "iot_platform")

REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/1")

# --- Database Connection ---
try:
    conn = Connection(
        arangoURL=ARANGO_URL,
        username=ARANGO_USER,
        password=ARANGO_PASSWORD
    )
    db = conn[ARANGO_DB_NAME]
    readings_collection = db["readings"]
except Exception as e:
    print(f"FATAL: Could not connect to ArangoDB. Error: {e}")
    exit(1)

# --- Model Loading ---
# In a real system, the model would be versioned and loaded from a registry.
# Here, we simulate a simple pre-trained autoencoder model.
try:
    # A placeholder for a real TensorFlow model
    # model = tf.keras.models.load_model('path/to/my/autoencoder_model.h5')
    # For this example, we'll mock the model's behavior.
    class MockModel:
        def predict(self, data):
            # Simulate anomaly score based on a payload value.
            # A real model would process a tensor.
            import numpy as np
            value = data.get('value', 0)
            # High values are more anomalous
            score = np.clip( (value - 50) / 50.0, 0, 1) 
            return np.array([[score]])

    model = MockModel()
    print("TensorFlow model loaded successfully.")
except Exception as e:
    print(f"FATAL: Could not load TensorFlow model. Error: {e}")
    exit(1)


def process_reading(message: dict):
    """
    Fetches, processes, and updates a single reading.
    """
    reading_id = message.get('readingId')
    if not reading_id:
        print("WARN: Received message without readingId.")
        return

    try:
        # 1. Fetch the full document from ArangoDB
        doc = readings_collection[reading_id]
        payload = doc['payload']
        
        # 2. Preprocess data for the model
        # This step is highly model-dependent.
        # Assuming payload is {'value': 75.3}
        # In a real scenario, this would involve normalization, feature extraction, etc.
        # tf_input = tf.constant([[payload.get('value', 0.0)]], dtype=tf.float32)
        
        # 3. Perform inference
        reconstruction_error = model.predict(payload)
        anomaly_score = float(reconstruction_error[0][0])
        
        # 4. Update the document in ArangoDB
        doc['anomalyScore'] = anomaly_score
        doc['status'] = 'processed'
        doc.save() # Patch the document with new fields

        print(f"Processed reading {reading_id}, anomaly score: {anomaly_score:.4f}")

        # 5. Publish result back for real-time UI updates
        redis_client = redis.from_url(REDIS_URL)
        result_message = {
            'type': 'UPDATE',
            'readingId': doc._key,
            'deviceId': doc['deviceId'],
            'tenantId': doc['tenantId'],
            'timestamp': doc['timestamp'],
            'payload': doc['payload'],
            'anomalyScore': anomaly_score
        }
        redis_client.publish('results_channel', json.dumps(result_message))

    except Exception as e:
        print(f"ERROR: Failed to process reading {reading_id}. Error: {e}")
        # A robust system would move this message to a dead-letter queue.


def main():
    """
    Main worker loop listening to Redis Pub/Sub.
    """
    print("Inference worker starting...")
    r = redis.from_url(REDIS_URL)
    pubsub = r.pubsub()
    pubsub.subscribe('inference_queue')
    
    print("Subscribed to 'inference_queue'. Waiting for messages...")
    for message in pubsub.listen():
        if message['type'] == 'message':
            try:
                data = json.loads(message['data'])
                process_reading(data)
            except json.JSONDecodeError:
                print(f"WARN: Could not decode JSON message: {message['data']}")
            except Exception as e:
                print(f"CRITICAL: Unhandled exception in main loop: {e}")

if __name__ == '__main__':
    main()

4. React Frontend with Zustand

The final piece is the client-side dashboard. Zustand is chosen for its minimal boilerplate. It connects to the WebSocket and updates its state store upon receiving new messages, causing the UI to reactively render the changes.

// src/store/socketStore.js
import create from 'zustand';
import { immer } from 'zustand/middleware/immer';

// This Zustand store manages WebSocket connection state and real-time data.
export const useSocketStore = create(immer((set, get) => ({
  socket: null,
  connectionStatus: 'disconnected',
  readings: {}, // Store readings by deviceId: { deviceId: [reading1, reading2] }

  connect: (authToken) => {
    if (get().socket) return; // Prevent multiple connections

    const ws = new WebSocket('ws://localhost:9292', [], {
      headers: { 'X-Auth-Token': authToken }
    });

    set(state => {
      state.socket = ws;
      state.connectionStatus = 'connecting';
    });

    ws.onopen = () => {
      set(state => { state.connectionStatus = 'connected'; });
      console.log('WebSocket connected.');
    };

    ws.onclose = () => {
      set(state => {
        state.socket = null;
        state.connectionStatus = 'disconnected';
      });
      console.log('WebSocket disconnected.');
    };

    ws.onerror = (error) => {
      console.error('WebSocket error:', error);
      // Let onclose handle the state change
    };

    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      if (message.type === 'UPDATE') {
        set(state => {
          const { deviceId } = message;
          if (!state.readings[deviceId]) {
            state.readings[deviceId] = [];
          }
          // Keep the list of readings from growing indefinitely
          state.readings[deviceId].unshift(message);
          if (state.readings[deviceId].length > 100) {
            state.readings[deviceId].pop();
          }
        });
      }
    };
  },

  sendData: (data) => {
    const { socket } = get();
    if (socket && socket.readyState === WebSocket.OPEN) {
      socket.send(JSON.stringify(data));
    } else {
      console.error('Cannot send data: WebSocket is not connected.');
    }
  },

  disconnect: () => {
    const { socket } = get();
    if (socket) {
      socket.close();
      set(state => { state.socket = null; state.connectionStatus = 'disconnected'; });
    }
  }
})));

// Example React Component
//
// function Dashboard() {
//   const { connect, connectionStatus, readings } = useSocketStore();
//
//   useEffect(() => {
//     // Token should come from auth context
//     const token = 'some_user_key:some_tenant_id';
//     connect(token);
//     return () => disconnect();
//   }, [connect, disconnect]);
//
//   return (
//     <div>
//       <h1>Connection: {connectionStatus}</h1>
//       {Object.entries(readings).map(([deviceId, deviceReadings]) => (
//         <div key={deviceId}>
//           <h2>Device: {deviceId}</h2>
//           {/* Render deviceReadings, highlighting high anomaly scores */}
//         </div>
//       ))}
//     </div>
//   );
// }

Extensibility and Architectural Limitations

This architecture provides a solid foundation but has clear limitations and avenues for improvement. The inter-service communication via Redis Pub/Sub is simple but offers no delivery guarantees; if the Python worker is down, messages are lost. A production system should use a persistent message queue like RabbitMQ or Kafka.

The Python worker model is also naive. A single worker processing messages serially will not scale. This should be replaced with a proper worker pool framework like Celery or a dedicated stream processing system. For inference, deploying the model via TensorFlow Serving would provide better performance, versioning, and scalability than a simple Python script.

Furthermore, the mechanism for pushing results back to the client relies on the Python worker publishing to a results_channel. An alternative, more decoupled approach would be to use ArangoDB’s native streaming capabilities. The Ruby Gateway could subscribe directly to a stream of changes in the readings collection, automatically pushing updates to clients as soon as the Python worker modifies a document. This removes the need for the second Redis channel and further decouples the services.


  TOC