The initial deployment of our computer vision model to the Android fleet felt like a success. The PyTorch Mobile build was stable, inference worked, and crash rates reported by Firebase were nominal. The problem emerged six weeks later, not as a crash, but as a slow bleed of user engagement from a specific cohort. After days of digging, we correlated it with a recent model update. The root cause was a subtle 12% increase in average inference latency and a corresponding rise in battery consumption on devices with mid-range MediaTek chipsets. We were completely blind to this class of problem. Our observability stack ended at the API gateway; the on-device environment was a black box. This incident became the catalyst for building a dedicated telemetry and anomaly detection pipeline for our edge AI models.
Our initial concept was to create a lightweight, non-blocking telemetry collector within the Android application. This collector needed to capture not just model-specific metrics like inference duration but also system-level context like CPU load, memory pressure, and device temperature during inference. The data had to be batched and shipped efficiently to a cloud backend for aggregation, analysis, and visualization. The core requirement was not just to see dashboards but to get automatically alerted to statistical deviations in performance across a fragmented device landscape.
The technology selection process was driven by pragmatism and the need for control. The client was fixed: an Android app running a PyTorch Mobile model. For the cloud backbone, we chose Azure, primarily for its robust Event Hubs service, which is built to handle massive streams of small, independent events—a perfect match for our telemetry payloads.
The heart of the system would be the ELK Stack (Elasticsearch, Logstash, Kibana). A common mistake is to default to a fully managed cloud observability suite. In our case, this was insufficient. We needed direct, low-level access to the aggregated time-series data to feed a custom anomaly detection model. Running a self-managed ELK cluster on Azure VMs (or using the Elastic Cloud on Azure integration) gave us this flexibility.
For anomaly detection itself, we opted to build a custom service using PyTorch. While Elasticsearch has its own anomaly detection features, they are generalized. Our problem domain was specific: we needed to correlate model version, device hardware class, and multiple performance metrics. A custom autoencoder model trained on our own baseline data would provide a more nuanced and accurate definition of “normal” behavior.
Finally, for the frontend, the MLOps team needed a zero-latency, highly specific dashboard—not a generic Kibana view. We chose Svelte for this. Its compiler-based approach yields extremely small bundles and high performance, which is ideal for a data-dense, real-time monitoring interface that must never feel sluggish. The dashboard would surface only the detected anomalies and allow engineers to drill down into the correlated metrics for root cause analysis.
The architecture took shape as follows:
graph TD subgraph Android Device Fleet A[Android App] --> B{PyTorch Model}; B -- Latency, Confidence --> C[Telemetry Collector]; D[System Monitor] -- CPU, Memory, Temp --> C; end subgraph Azure Cloud C --> E[Azure Event Hubs]; E --> F[Logstash on VM]; F --> G[Elasticsearch Cluster]; subgraph Anomaly Detection Service H[PyTorch Autoencoder] -- Queries --> G; H -- Writes Anomaly --> I[Alerts Index in ES]; end end subgraph Operator View J[Svelte Dashboard] -- Queries --> I; J -- Drills Down --> G; end style C fill:#f9f,stroke:#333,stroke-width:2px style H fill:#ccf,stroke:#333,stroke-width:2px
The Android Telemetry Client
The on-device component had to be extremely careful about performance and resource consumption. It could not, under any circumstances, impact the user experience. We implemented this as a singleton TelemetryManager
in Kotlin, using a thread-safe, non-blocking queue and a dedicated background thread for batching and network I/O.
A critical design choice was the telemetry data schema. A flat JSON object is easy to work with but can be verbose. We settled on a structured format that was both comprehensive and reasonably compact.
TelemetryPayload.kt
// A data class representing a single inference event.
// Using primitives where possible to minimize boxing overhead.
data class TelemetryEvent(
val timestamp: Long, // UTC milliseconds
val modelName: String,
val modelVersion: String,
val inferenceLatencyMs: Long,
val cpuKernelTimeMs: Long, // Time spent in kernel mode during inference
val memoryRssKb: Long, // Resident Set Size after inference
val temperatureCelsius: Float,
// A simplified representation of the output tensor's characteristics.
// In a real project, this could be more complex, e.g., confidence scores.
val outputConfidence: Float
)
// The payload sent to the backend, containing device context and a batch of events.
data class TelemetryPayload(
val deviceId: String,
val manufacturer: String,
val model: String,
val osVersion: Int,
val events: List<TelemetryEvent>
)
The core of the client is the TelemetryManager
. It’s initialized on app startup and exposes a single method recordInference
to be called immediately after a PyTorch model runs.
TelemetryManager.kt
import android.os.Build
import android.os.Process
import java.io.File
import java.net.HttpURLConnection
import java.net.URL
import java.util.UUID
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import com.google.gson.Gson
object TelemetryManager {
private const val BATCH_SIZE = 50
private const val FLUSH_INTERVAL_SECONDS = 60L
// This would be securely stored and fetched, not hardcoded.
private const val INGESTION_ENDPOINT = "YOUR_AZURE_EVENT_HUB_COMPATIBLE_ENDPOINT"
private const val SAS_TOKEN = "YOUR_SHARED_ACCESS_SIGNATURE_TOKEN"
private val eventQueue = ConcurrentLinkedQueue<TelemetryEvent>()
private val backgroundExecutor = Executors.newSingleThreadScheduledExecutor()
private val httpClientExecutor = Executors.newFixedThreadPool(2)
private val gson = Gson()
private val deviceId: String by lazy {
// A real implementation would use a more robust unique ID.
UUID.randomUUID().toString()
}
fun start() {
backgroundExecutor.scheduleAtFixedRate(::flushQueue, FLUSH_INTERVAL_SECONDS, FLUSH_INTERVAL_SECONDS, TimeUnit.SECONDS)
}
fun recordInference(
modelName: String,
modelVersion: String,
inferenceBlock: () -> Float // The actual inference call
) {
val startCpuTime = Process.getElapsedCpuTime()
val startTime = System.nanoTime()
val confidence = inferenceBlock() // Execute the model inference
val endTime = System.nanoTime()
val endCpuTime = Process.getElapsedCpuTime()
val latencyMs = TimeUnit.NANOSECONDS.toMillis(endTime - startTime)
val cpuKernelTimeMs = endCpuTime - startCpuTime
// In a real project, these would be collected more carefully.
val memoryRssKb = readMemoryStats()
val temperature = readDeviceTemperature()
val event = TelemetryEvent(
timestamp = System.currentTimeMillis(),
modelName = modelName,
modelVersion = modelVersion,
inferenceLatencyMs = latencyMs,
cpuKernelTimeMs = cpuKernelTimeMs,
memoryRssKb = memoryRssKb,
temperatureCelsius = temperature,
outputConfidence = confidence
)
eventQueue.offer(event)
if (eventQueue.size >= BATCH_SIZE) {
backgroundExecutor.execute(::flushQueue)
}
}
private fun flushQueue() {
// This runs on the backgroundExecutor thread.
val eventsToFlush = mutableListOf<TelemetryEvent>()
while (eventQueue.isNotEmpty() && eventsToFlush.size < BATCH_SIZE) {
eventQueue.poll()?.let { eventsToFlush.add(it) }
}
if (eventsToFlush.isEmpty()) return
val payload = TelemetryPayload(
deviceId = deviceId,
manufacturer = Build.MANUFACTURER,
model = Build.MODEL,
osVersion = Build.VERSION.SDK_INT,
events = eventsToFlush
)
sendPayload(payload)
}
private fun sendPayload(payload: TelemetryPayload) {
// Offload network I/O to a separate thread pool to not block the scheduler.
httpClientExecutor.execute {
val jsonPayload = gson.toJson(payload)
var connection: HttpURLConnection? = null
try {
val url = URL(INGESTION_ENDPOINT)
connection = url.openConnection() as HttpURLConnection
connection.requestMethod = "POST"
connection.setRequestProperty("Authorization", SAS_TOKEN)
connection.setRequestProperty("Content-Type", "application/vnd.microsoft.servicebus.json")
connection.doOutput = true
connection.outputStream.use { os ->
val input = jsonPayload.toByteArray(Charsets.UTF_8)
os.write(input, 0, input.size)
}
val code = connection.responseCode
if (code != 201) {
// A robust implementation would have retry logic with exponential backoff.
// For now, we just log the failure.
println("Telemetry upload failed with code: $code")
}
} catch (e: Exception) {
// Handle exceptions: network issues, timeouts, etc.
e.printStackTrace()
} finally {
connection?.disconnect()
}
}
}
// Dummy implementations for system stats. A production app would use more reliable APIs.
private fun readMemoryStats(): Long = Runtime.getRuntime().totalMemory() / 1024
private fun readDeviceTemperature(): Float = 38.5f // Placeholder
}
The integration point within the app becomes trivial.
// Example usage in the ViewModel or Repository
val outputTensor = module.forward(IValue.from(inputTensor))
val scores = outputTensor.toTensor().dataAsFloatArray
val confidence = scores.maxOrNull() ?: 0.0f
// The inference block is passed as a lambda.
TelemetryManager.recordInference("vision_model_v3", "3.1.2") {
// This block is timed.
val output = module.forward(IValue.from(inputTensor))
// Return a metric of interest from the model output.
return@recordInference output.toTensor().dataAsFloatArray.maxOrNull() ?: 0.0f
}
The Azure Ingestion Pipeline
With data flowing from devices, the cloud side needed to be ready to catch it. We used Terraform to define the infrastructure declaratively, ensuring it’s repeatable and version-controlled.
main.tf
provider "azurerm" {
features {}
}
resource "azurerm_resource_group" "rg" {
name = "mlops-telemetry-rg"
location = "East US"
}
resource "azurerm_eventhub_namespace" "eh_namespace" {
name = "mlops-telemetry-ns"
location = azurerm_resource_group.rg.location
resource_group_name = azurerm_resource_group.rg.name
sku = "Standard"
capacity = 2
}
resource "azurerm_eventhub" "eh" {
name = "inference-events"
namespace_name = azurerm_eventhub_namespace.eh_namespace.name
resource_group_name = azurerm_resource_group.rg.name
partition_count = 4
message_retention = 1
}
# ... Additional resources for Logstash VM, network, and Elasticsearch cluster ...
The lynchpin of the ingestion process is Logstash. It pulls messages from Event Hub, unnests the batched events, performs minor enrichment, and ships them to Elasticsearch. The configuration file is the source of truth for this pipeline.
logstash.conf
input {
azure_event_hubs {
event_hub_connections => ["${EVENT_HUB_CONNECTION_STRING}"]
threads => 4 # Match partition count
codec => "json"
consumer_group => "logstash-consumers"
}
}
filter {
# The payload from Android is a batch. We need to split it into individual events.
split {
field => "events"
target => "event"
}
# Promote nested fields to the top level for easier querying in Elasticsearch.
mutate {
add_field => {
"timestamp" => "%{[event][timestamp]}"
"model_name" => "%{[event][modelName]}"
"model_version" => "%{[event][modelVersion]}"
"latency_ms" => "%{[event][inferenceLatencyMs]}"
"cpu_kernel_ms" => "%{[event][cpuKernelTimeMs]}"
"memory_rss_kb" => "%{[event][memoryRssKb]}"
"temp_celsius" => "%{[event][temperatureCelsius]}"
"confidence" => "%{[event][outputConfidence]}"
"device_id" => "%{[deviceId]}"
"device_manufacturer" => "%{[manufacturer]}"
"device_model" => "%{[model]}"
"device_os_version" => "%{[osVersion]}"
}
}
# Convert timestamp to a proper date object for Elasticsearch.
date {
match => [ "timestamp", "UNIX_MS" ]
target => "@timestamp"
}
# A key step for performance is to convert data types.
mutate {
convert => {
"latency_ms" => "integer"
"cpu_kernel_ms" => "integer"
"memory_rss_kb" => "integer"
"temp_celsius" => "float"
"confidence" => "float"
"device_os_version" => "integer"
}
}
# Clean up the original, bulky fields after processing.
mutate {
remove_field => ["events", "event", "deviceId", "manufacturer", "model", "osVersion", "timestamp"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch-node1:9200"]
index => "ml-telemetry-%{+YYYY.MM.dd}"
# In a real project, use a proper user and API key, not basic auth.
user => "elastic"
password => "${ELASTIC_PASSWORD}"
}
}
Before sending data, we must define an explicit index template in Elasticsearch. Without it, Elasticsearch will guess the data types, which often leads to poor storage efficiency and incorrect query behavior. For example, model_version
(“3.1.2”) must be treated as a keyword
, not text
, to allow for exact aggregations.
The PyTorch Anomaly Detection Service
This service is the brain of the operation. It runs on a schedule (e.g., every 5 minutes), queries the last 15 minutes of aggregated data from Elasticsearch, and runs it through a trained PyTorch autoencoder model.
The core idea of an autoencoder for anomaly detection is simple: train a neural network to reconstruct “normal” input data. When the model is presented with data that deviates from what it has seen during training (an anomaly), its reconstruction error will be high.
anomaly_detector.py
import torch
import torch.nn as nn
from elasticsearch import Elasticsearch
import schedule
import time
import numpy as np
# --- 1. PyTorch Autoencoder Model ---
class TelemetryAutoencoder(nn.Module):
def __init__(self, input_dim=5):
super(TelemetryAutoencoder, self).__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 3),
nn.ReLU(),
)
self.decoder = nn.Sequential(
nn.Linear(3, input_dim),
nn.Sigmoid() # Assuming data is scaled to [0, 1]
)
def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(x)
return decoded
# --- 2. Anomaly Detection Logic ---
class AnomalyDetector:
def __init__(self, es_client, model_path):
self.es = es_client
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = TelemetryAutoencoder(input_dim=5)
self.model.load_state_dict(torch.load(model_path))
self.model.to(self.device)
self.model.eval()
# In production, these scalers would be saved from the training phase.
self.scalers = self._load_scalers() # Dummy method
# Threshold is determined empirically from a validation set.
self.reconstruction_threshold = 0.15
def _load_scalers(self):
# This should load MinMaxScaler objects fitted on the training data.
# For simplicity, we define max values seen during training.
return {
'latency_ms': 200.0,
'cpu_kernel_ms': 150.0,
'memory_rss_kb': 50000.0,
'temp_celsius': 60.0,
'confidence': 1.0
}
def _query_es(self):
# Query for aggregated data over the last 5 minutes.
query = {
"size": 0,
"query": {
"range": {
"@timestamp": { "gte": "now-5m/m", "lt": "now/m" }
}
},
"aggs": {
"groupings": {
"composite": {
"sources": [
{"model_version": {"terms": {"field": "model_version.keyword"}}},
{"device_model": {"terms": {"field": "device_model.keyword"}}}
],
"size": 1000
},
"aggs": {
"avg_latency": {"avg": {"field": "latency_ms"}},
"avg_cpu": {"avg": {"field": "cpu_kernel_ms"}},
"avg_mem": {"avg": {"field": "memory_rss_kb"}},
"avg_temp": {"avg": {"field": "temp_celsius"}},
"avg_conf": {"avg": {"field": "confidence"}}
}
}
}
}
response = self.es.search(index="ml-telemetry-*", body=query)
return response['aggregations']['groupings']['buckets']
def detect(self):
buckets = self._query_es()
if not buckets:
print("No new data to process.")
return
for bucket in buckets:
key = bucket['key']
features = np.array([
bucket['avg_latency']['value'] / self.scalers['latency_ms'],
bucket['avg_cpu']['value'] / self.scalers['cpu_kernel_ms'],
bucket['avg_mem']['value'] / self.scalers['memory_rss_kb'],
bucket['avg_temp']['value'] / self.scalers['temp_celsius'],
bucket['avg_conf']['value'] / self.scalers['confidence']
]).astype(np.float32)
with torch.no_grad():
tensor_in = torch.from_numpy(features).unsqueeze(0).to(self.device)
reconstructed = self.model(tensor_in)
loss = nn.MSELoss()(reconstructed, tensor_in)
if loss.item() > self.reconstruction_threshold:
self._log_anomaly(key, loss.item())
def _log_anomaly(self, key, score):
doc = {
"@timestamp": int(time.time() * 1000),
"model_version": key['model_version'],
"device_model": key['device_model'],
"anomaly_score": score,
"message": f"High reconstruction error for {key['model_version']} on {key['device_model']}"
}
self.es.index(index="ml-telemetry-anomalies", body=doc)
print(f"ANOMALY DETECTED: {doc['message']}")
def job():
es = Elasticsearch(hosts=["http://localhost:9200"]) # Use env vars
detector = AnomalyDetector(es, "autoencoder.pth")
detector.detect()
if __name__ == "__main__":
schedule.every(5).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)
The Svelte Monitoring Dashboard
The final piece is the operator’s view. A Svelte application provides a clean, focused UI to surface the alerts generated by the PyTorch service. It’s not meant to replace Kibana but to provide a “first-look” dashboard for the MLOps team.
The core of the Svelte app is a page that polls the ml-telemetry-anomalies
index.
src/routes/+page.svelte
<script>
import { onMount } from 'svelte';
let anomalies = [];
let isLoading = true;
let error = null;
// In a real app, the ES endpoint would be proxied through a backend
// to avoid exposing credentials to the browser.
const ES_ENDPOINT = 'http://your-es-proxy/ml-telemetry-anomalies/_search';
async function fetchAnomalies() {
try {
const query = {
"size": 50,
"sort": [{ "@timestamp": "desc" }],
"query": { "match_all": {} }
};
const response = await fetch(ES_ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(query)
});
if (!response.ok) {
throw new Error(`Failed to fetch anomalies: ${response.statusText}`);
}
const data = await response.json();
anomalies = data.hits.hits.map(hit => hit._source);
} catch (e) {
error = e.message;
} finally {
isLoading = false;
}
}
onMount(() => {
fetchAnomalies();
const interval = setInterval(fetchAnomalies, 30000); // Poll every 30 seconds
return () => clearInterval(interval);
});
</script>
<main>
<h1>On-Device Model Performance Anomalies</h1>
{#if isLoading}
<p>Loading...</p>
{:else if error}
<p class="error">{error}</p>
{:else if anomalies.length === 0}
<p>No anomalies detected. All systems normal.</p>
{:else}
<table>
<thead>
<tr>
<th>Timestamp</th>
<th>Model Version</th>
<th>Device Model</th>
<th>Anomaly Score</th>
<th>Message</th>
</tr>
</thead>
<tbody>
{#each anomalies as anomaly}
<tr>
<td>{new Date(anomaly['@timestamp']).toLocaleString()}</td>
<td>{anomaly.model_version}</td>
<td>{anomaly.device_model}</td>
<td>{anomaly.anomaly_score.toFixed(4)}</td>
<td>{anomaly.message}</td>
</tr>
{/each}
</tbody>
</table>
{/if}
</main>
<style>
main { padding: 1rem; font-family: sans-serif; }
h1 { color: #333; }
table { width: 100%; border-collapse: collapse; margin-top: 1rem; }
th, td { padding: 0.75rem; border: 1px solid #ddd; text-align: left; }
th { background-color: #f2f2f2; }
.error { color: red; }
</style>
This system provided the visibility we were desperately lacking. We moved from reactive, user-complaint-driven investigations to proactive, data-driven alerting on subtle performance regressions. The solution is not without its limitations. The anomaly detection model requires periodic retraining as “normal” behavior shifts with new model versions and OS updates. The telemetry client also introduces a small, but non-zero, overhead on the device; further optimization could involve switching to a more efficient serialization format like Protocol Buffers. A future iteration could also close the loop, using detected anomalies to trigger automated rollbacks or to gate canary deployments via an integration with our CI/CD system.