The core challenge is architecting a system capable of ingesting high-frequency time-series data from thousands of devices, distributed across hundreds of isolated tenants, while performing near real-time anomaly detection and pushing notifications to a reactive user interface. Strict data isolation and permission enforcement are non-negotiable. A monolithic architecture, while seemingly simpler, presents significant compromises in scalability, maintainability, and the ability to use the best tool for each discrete problem domain: high-throughput I/O, complex graph-based relationships, numerical computation, and reactive front-end state management.
Architectural Decision Point: Monolith vs. Polyglot Microservices
A monolithic approach, likely centered around Python, appears attractive initially. A single codebase using FastAPI for WebSockets, TensorFlow for modeling, and a traditional relational database with complex JOINs or a document store for data could handle the task.
Monolithic Python Stack - Analysis:
- Pros:
- Simplified development loop and dependency management.
- No network overhead for inter-service communication.
- Easier transactional integrity within a single database.
- Cons:
- I/O Bottleneck: Python’s Global Interpreter Lock (GIL) can become a significant bottleneck when managing tens of thousands of concurrent WebSocket connections, even with async frameworks. The computational workload from TensorFlow inference can starve the I/O event loop, leading to increased latency.
- Data Modeling Impedance: Modeling multi-tenancy permissions (e.g.,
Tenant
->Organization
->User
->DeviceGroup
->Device
) in a relational or simple document model becomes cumbersome. Queries to verify access rights often require multiple complex joins or application-level logic, which is both slow and error-prone. - Deployment Rigidity: The entire application must be scaled as a single unit. Scaling the WebSocket ingestion layer also means scaling the resource-intensive ML inference components, leading to inefficient resource utilization.
Polyglot Service-Oriented Architecture - Analysis:
This alternative decouples the system into specialized services, each built with the most appropriate technology.
Proposed Stack:
- Ingestion & Real-time Gateway: Ruby (Puma +
faye-websocket-ruby
) for managing persistent WebSocket connections. - Data & Permission Store: ArangoDB, a multi-model database, leveraging its graph capabilities for tenancy and entity relationships.
- Inference Service: A dedicated Python service for running TensorFlow models.
- Frontend: A React application using Zustand for lightweight, reactive state management.
- Communication Bus: Redis Pub/Sub for asynchronous messaging between the Ruby and Python services.
- Ingestion & Real-time Gateway: Ruby (Puma +
Pros:
- Optimized I/O: Ruby, particularly with mature servers like Puma, has a robust, multi-threaded concurrency model well-suited for handling a massive number of I/O-bound WebSocket connections without the constraints of a GIL in the same way Python has.
- Purpose-Built Data Model: ArangoDB’s graph model is a perfect fit. Permissions can be modeled as edges between vertices (
_from: 'users/123'
,_to: 'devices/456'
,type: 'can_read'
). Checking permissions becomes a fast, single graph traversal query. - Independent Scaling: The Ruby ingestion service can be scaled based on connection count, while the Python TensorFlow service can be scaled independently based on CPU/GPU demand.
- Technology Isolation: A critical bug or performance issue in the inference service will not bring down the data ingestion layer.
Cons:
- Operational Complexity: Requires managing, deploying, and monitoring multiple services.
- Network Latency: Inter-service communication introduces network latency, which must be accounted for.
- Data Consistency: Eventual consistency becomes the default model between services, requiring careful design to handle potential race conditions or failures.
The final decision is to proceed with the polyglot architecture. For a production system with these requirements, the benefits of specialized components and independent scaling far outweigh the increase in operational complexity. The risk of I/O contention and inefficient scaling in the monolithic approach is too high for a real-time system.
Core Implementation: A Walkthrough of the Architecture
The data flow is as follows: A device connects via WebSocket to the Ruby Gateway. The gateway authenticates the connection, authorizes it against the ArangoDB graph, and begins accepting data points. Each data point is persisted in ArangoDB and a message is published to a Redis channel. The Python Inference Service, subscribed to this channel, picks up the message, retrieves the data from ArangoDB, runs it through a TensorFlow model, and updates the data point in ArangoDB with an anomaly score. ArangoDB’s change data capture (or a secondary Redis message) triggers the Ruby Gateway to push the scored data point back to the subscribed front-end clients for that specific tenant.
1. ArangoDB: The Graph-Based Tenancy Backbone
The schema is defined by collections for entities (vertices) and relationships (edges).
Vertex Collections:
-
tenants
: Top-level customer accounts. -
users
: Users belonging to a tenant. -
devices
: IoT devices. -
readings
: Time-series data points from devices.
Edge Collections:
-
has_member
: Connectstenants
tousers
. -
owns_device
: Connectstenants
todevices
. -
can_access
: Connectsusers
todevices
(for fine-grained permissions).
Here is the setup script using arangojs
for demonstration, but this logic would be in a Ruby migration or setup task.
// arango_setup.js
// This script demonstrates the schema structure. In production, this would be
// managed by application migrations.
const { Database } = require('arangojs');
const db = new Database({
url: process.env.ARANGO_URL || "http://localhost:8529",
databaseName: process.env.ARANGO_DB || "iot_platform",
auth: { username: "root", password: process.env.ARANGO_PASSWORD },
});
async function setupSchema() {
const vertexCollections = ['tenants', 'users', 'devices', 'readings'];
const edgeCollections = ['has_member', 'owns_device', 'can_access'];
for (const name of vertexCollections) {
const collection = db.collection(name);
if (!(await collection.exists())) {
console.log(`Creating vertex collection: ${name}`);
await collection.create();
}
}
for (const name of edgeCollections) {
const collection = db.edgeCollection(name);
if (!(await collection.exists())) {
console.log(`Creating edge collection: ${name}`);
await collection.create();
}
}
// Ensure an index on the device_id for fast lookups
const devices = db.collection('devices');
await devices.ensureIndex({
type: 'persistent',
fields: ['deviceId'],
unique: true
});
// Index readings by timestamp for time-series queries
const readings = db.collection('readings');
await readings.ensureIndex({
type: 'persistent',
fields: ['deviceId', 'timestamp']
});
console.log('Schema setup complete.');
}
setupSchema().catch(err => {
console.error('Failed to setup ArangoDB schema:', err);
process.exit(1);
});
A critical piece is the AQL (ArangoDB Query Language) query for authorization. This query checks if a user has permission to publish data for a specific device. It traverses the graph from the user up to the tenant and then down to the device.
-- AQL Query for Authorization
-- This query verifies if a user (identified by user_key) can access a device (identified by deviceId).
-- It checks two paths:
-- 1. Does the user's tenant directly own the device? (Tenant-level access)
-- 2. Does the user have a direct `can_access` edge to the device? (User-level override)
-- `user_key` and `deviceId` are bind parameters.
LET user = DOCUMENT(CONCAT("users/", @user_key))
LET device = FIRST(
FOR d IN devices
FILTER d.deviceId == @deviceId
RETURN d
)
LET tenant_access = (
FOR v, e, p IN 1..2 INBOUND user has_member
FILTER p.vertices[1]._id == device.tenant_id
RETURN 1
)
LET direct_access = (
FOR v IN 1..1 OUTBOUND user can_access
FILTER v._id == device._id
RETURN 1
)
RETURN LENGTH(tenant_access) > 0 OR LENGTH(direct_access) > 0
This single, efficient query replaces multiple SQL joins and complex application logic, demonstrating the power of the graph model for this use case.
2. Ruby WebSocket Gateway
We’ll use Puma as the server and the faye-websocket-ruby
library for handling the WebSocket protocol. The application authenticates, authorizes, persists data, and publishes to Redis.
# config.ru
require 'puma'
require 'faye/websocket'
require 'redis'
require 'json'
require 'arango-ruby' # arangorb gem
# --- Database and Redis Configuration ---
# In a real application, this would be in an initializer.
ARANGO_DB = Arango::Database.new(
url: ENV.fetch('ARANGO_URL', 'http://localhost:8529'),
database: ENV.fetch('ARANGO_DB', 'iot_platform'),
username: 'root',
password: ENV.fetch('ARANGO_PASSWORD')
)
REDIS_CONN = Redis.new(url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1'))
WEBSOCKET_CHANNELS = Concurrent::Map.new
# --- Authorization Logic ---
# A practical implementation of the AQL query from above.
def authorize_device_access(user_key, device_id)
query = <<-AQL
LET user = DOCUMENT(CONCAT("users/", @user_key))
LET device = FIRST(FOR d IN devices FILTER d.deviceId == @deviceId RETURN d)
// Return early if user or device not found
IF user == null OR device == null THEN RETURN false END
LET tenant_access = LENGTH(
FOR v IN 1..1 INBOUND user has_member
FILTER v._id == device.tenant_id
LIMIT 1
RETURN 1
) > 0
LET direct_access = LENGTH(
FOR v IN 1..1 OUTBOUND user can_access
FILTER v._id == device._id
LIMIT 1
RETURN 1
) > 0
RETURN tenant_access OR direct_access
AQL
cursor = ARANGO_DB.query.execute(query, bind_vars: { user_key: user_key, deviceId: device_id })
cursor.first
end
# --- WebSocket Application Logic ---
App = ->(env) do
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, { ping: 15 })
# Connection state variables
user_key = nil
tenant_id = nil
authorized_devices = Concurrent::Set.new
ws.on :open do |event|
# In a real app, use a secure token from the query string or headers.
# For simplicity, we assume a header `X-Auth-Token` like "user_key:tenant_id".
token = env['HTTP_X_AUTH_TOKEN']
unless token && token.include?(':')
ws.close(4001, 'Authentication token missing or invalid.')
next
end
user_key, tenant_id = token.split(':', 2)
# Register this connection for server-to-client pushes
WEBSOCKET_CHANNELS.compute_if_absent(tenant_id) { Concurrent::Array.new } << ws
puts "WebSocket connection opened for user #{user_key} in tenant #{tenant_id}"
end
ws.on :message do |event|
begin
data = JSON.parse(event.data)
device_id = data['deviceId']
# A common mistake is to trust client input. Always validate.
unless device_id && data['payload']
ws.send({ error: 'Invalid message format. Missing deviceId or payload.' }.to_json)
next
end
# Authorize on first message for a device, then cache result.
unless authorized_devices.include?(device_id)
is_authorized = authorize_device_access(user_key, device_id)
if is_authorized
authorized_devices.add(device_id)
else
ws.send({ error: "Unauthorized access to device #{device_id}." }.to_json)
# Potentially close the connection after repeated failures.
next
end
end
# Persist to ArangoDB
readings_collection = ARANGO_DB.collection('readings')
doc_data = {
deviceId: device_id,
tenantId: tenant_id,
payload: data['payload'],
timestamp: Time.now.to_i,
anomalyScore: nil, # To be filled by the inference service
status: 'pending'
}
new_reading = readings_collection.create_document(doc_data, return_new: true)
# Publish to Redis for the Python service to process
message_for_worker = { readingId: new_reading.key, deviceId: device_id }.to_json
REDIS_CONN.publish('inference_queue', message_for_worker)
rescue JSON::ParserError
ws.send({ error: 'Invalid JSON message.' }.to_json)
rescue => e
# Proper logging is critical in production.
puts "Error processing message: #{e.message}"
ws.send({ error: 'Internal server error.' }.to_json)
end
end
ws.on :close do |event|
puts "Connection closed for user #{user_key}. Code: #{event.code}, Reason: #{event.reason}"
if tenant_id && WEBSOCKET_CHANNELS[tenant_id]
WEBSOCKET_CHANNELS[tenant_id].delete(ws)
end
ws = nil # Help garbage collection
end
ws.rack_response
else
# Non-WebSocket requests
[404, { 'Content-Type' => 'text/plain' }, ['Not Found']]
end
end
# To listen for processed results and push to clients
Thread.new do
Redis.new(url: ENV.fetch('REDIS_URL', 'redis://localhost:6379/1')).subscribe('results_channel') do |on|
on.message do |channel, msg|
data = JSON.parse(msg)
tenant_id = data['tenantId']
if tenant_id && WEBSOCKET_CHANNELS[tenant_id]
WEBSOCKET_CHANNELS[tenant_id].each do |ws|
ws.send(msg) # Forward the full message to the client
end
end
end
end
end
run App
This Ruby component is highly focused. Its only jobs are managing connections, performing auth checks, and acting as a data forwarder. It does no heavy computation.
3. Python TensorFlow Inference Service
This service is a simple, long-running script. It listens to Redis, performs inference, and updates ArangoDB. For production, this would be managed by a process supervisor like systemd
and scaled by running multiple instances.
# inference_worker.py
import os
import json
import redis
import tensorflow as tf
from pyArango.connection import *
# --- Configuration ---
ARANGO_URL = os.environ.get("ARANGO_URL", "http://127.0.0.1:8529")
ARANGO_USER = "root"
ARANGO_PASSWORD = os.environ.get("ARANGO_PASSWORD")
ARANGO_DB_NAME = os.environ.get("ARANGO_DB", "iot_platform")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379/1")
# --- Database Connection ---
try:
conn = Connection(
arangoURL=ARANGO_URL,
username=ARANGO_USER,
password=ARANGO_PASSWORD
)
db = conn[ARANGO_DB_NAME]
readings_collection = db["readings"]
except Exception as e:
print(f"FATAL: Could not connect to ArangoDB. Error: {e}")
exit(1)
# --- Model Loading ---
# In a real system, the model would be versioned and loaded from a registry.
# Here, we simulate a simple pre-trained autoencoder model.
try:
# A placeholder for a real TensorFlow model
# model = tf.keras.models.load_model('path/to/my/autoencoder_model.h5')
# For this example, we'll mock the model's behavior.
class MockModel:
def predict(self, data):
# Simulate anomaly score based on a payload value.
# A real model would process a tensor.
import numpy as np
value = data.get('value', 0)
# High values are more anomalous
score = np.clip( (value - 50) / 50.0, 0, 1)
return np.array([[score]])
model = MockModel()
print("TensorFlow model loaded successfully.")
except Exception as e:
print(f"FATAL: Could not load TensorFlow model. Error: {e}")
exit(1)
def process_reading(message: dict):
"""
Fetches, processes, and updates a single reading.
"""
reading_id = message.get('readingId')
if not reading_id:
print("WARN: Received message without readingId.")
return
try:
# 1. Fetch the full document from ArangoDB
doc = readings_collection[reading_id]
payload = doc['payload']
# 2. Preprocess data for the model
# This step is highly model-dependent.
# Assuming payload is {'value': 75.3}
# In a real scenario, this would involve normalization, feature extraction, etc.
# tf_input = tf.constant([[payload.get('value', 0.0)]], dtype=tf.float32)
# 3. Perform inference
reconstruction_error = model.predict(payload)
anomaly_score = float(reconstruction_error[0][0])
# 4. Update the document in ArangoDB
doc['anomalyScore'] = anomaly_score
doc['status'] = 'processed'
doc.save() # Patch the document with new fields
print(f"Processed reading {reading_id}, anomaly score: {anomaly_score:.4f}")
# 5. Publish result back for real-time UI updates
redis_client = redis.from_url(REDIS_URL)
result_message = {
'type': 'UPDATE',
'readingId': doc._key,
'deviceId': doc['deviceId'],
'tenantId': doc['tenantId'],
'timestamp': doc['timestamp'],
'payload': doc['payload'],
'anomalyScore': anomaly_score
}
redis_client.publish('results_channel', json.dumps(result_message))
except Exception as e:
print(f"ERROR: Failed to process reading {reading_id}. Error: {e}")
# A robust system would move this message to a dead-letter queue.
def main():
"""
Main worker loop listening to Redis Pub/Sub.
"""
print("Inference worker starting...")
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
pubsub.subscribe('inference_queue')
print("Subscribed to 'inference_queue'. Waiting for messages...")
for message in pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
process_reading(data)
except json.JSONDecodeError:
print(f"WARN: Could not decode JSON message: {message['data']}")
except Exception as e:
print(f"CRITICAL: Unhandled exception in main loop: {e}")
if __name__ == '__main__':
main()
4. React Frontend with Zustand
The final piece is the client-side dashboard. Zustand is chosen for its minimal boilerplate. It connects to the WebSocket and updates its state store upon receiving new messages, causing the UI to reactively render the changes.
// src/store/socketStore.js
import create from 'zustand';
import { immer } from 'zustand/middleware/immer';
// This Zustand store manages WebSocket connection state and real-time data.
export const useSocketStore = create(immer((set, get) => ({
socket: null,
connectionStatus: 'disconnected',
readings: {}, // Store readings by deviceId: { deviceId: [reading1, reading2] }
connect: (authToken) => {
if (get().socket) return; // Prevent multiple connections
const ws = new WebSocket('ws://localhost:9292', [], {
headers: { 'X-Auth-Token': authToken }
});
set(state => {
state.socket = ws;
state.connectionStatus = 'connecting';
});
ws.onopen = () => {
set(state => { state.connectionStatus = 'connected'; });
console.log('WebSocket connected.');
};
ws.onclose = () => {
set(state => {
state.socket = null;
state.connectionStatus = 'disconnected';
});
console.log('WebSocket disconnected.');
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
// Let onclose handle the state change
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'UPDATE') {
set(state => {
const { deviceId } = message;
if (!state.readings[deviceId]) {
state.readings[deviceId] = [];
}
// Keep the list of readings from growing indefinitely
state.readings[deviceId].unshift(message);
if (state.readings[deviceId].length > 100) {
state.readings[deviceId].pop();
}
});
}
};
},
sendData: (data) => {
const { socket } = get();
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(data));
} else {
console.error('Cannot send data: WebSocket is not connected.');
}
},
disconnect: () => {
const { socket } = get();
if (socket) {
socket.close();
set(state => { state.socket = null; state.connectionStatus = 'disconnected'; });
}
}
})));
// Example React Component
//
// function Dashboard() {
// const { connect, connectionStatus, readings } = useSocketStore();
//
// useEffect(() => {
// // Token should come from auth context
// const token = 'some_user_key:some_tenant_id';
// connect(token);
// return () => disconnect();
// }, [connect, disconnect]);
//
// return (
// <div>
// <h1>Connection: {connectionStatus}</h1>
// {Object.entries(readings).map(([deviceId, deviceReadings]) => (
// <div key={deviceId}>
// <h2>Device: {deviceId}</h2>
// {/* Render deviceReadings, highlighting high anomaly scores */}
// </div>
// ))}
// </div>
// );
// }
Extensibility and Architectural Limitations
This architecture provides a solid foundation but has clear limitations and avenues for improvement. The inter-service communication via Redis Pub/Sub is simple but offers no delivery guarantees; if the Python worker is down, messages are lost. A production system should use a persistent message queue like RabbitMQ or Kafka.
The Python worker model is also naive. A single worker processing messages serially will not scale. This should be replaced with a proper worker pool framework like Celery or a dedicated stream processing system. For inference, deploying the model via TensorFlow Serving would provide better performance, versioning, and scalability than a simple Python script.
Furthermore, the mechanism for pushing results back to the client relies on the Python worker publishing to a results_channel
. An alternative, more decoupled approach would be to use ArangoDB’s native streaming capabilities. The Ruby Gateway could subscribe directly to a stream of changes in the readings
collection, automatically pushing updates to clients as soon as the Python worker modifies a document. This removes the need for the second Redis channel and further decouples the services.