The existing mobile telemetry pipeline was a liability. We were authenticating our Android clients against the ingestion endpoint using a static bearer token embedded in the application. Any compromise of the APK meant the token was public, allowing threat actors to flood our system with garbage data or, worse, exfiltrate the token to replay legitimate-looking but malicious requests. The backend, a self-hosted ELK stack, was buckling under the financial and operational weight of indexing terabytes of unstructured log data daily, making any form of long-term analysis prohibitively expensive and slow. The mandate was clear: build a new pipeline founded on zero-trust principles, with a cost-effective ingestion layer and a robust, queryable data lakehouse for analytics.
Our first decision was to dismantle the token-based authentication entirely. In a real-world project, relying on a shared secret for a distributed fleet of devices is untenable. We moved to mutual TLS (mTLS), which enforces cryptographic identity on both the client and server. The Android device must present a valid, signed client certificate, and the server does the same. This ensures that only trusted devices can communicate with our infrastructure, and that they are communicating with the genuine endpoint, not a man-in-the-middle.
On the Android client, this required moving away from the default HTTP client and configuring a custom OkHttpClient
instance capable of handling a private key and certificate chain. The pitfall here is secure key storage. Embedding a private key directly in the app is no better than a bearer token. The only viable solution is to leverage the Android Keystore system, which stores cryptographic keys in a hardware-backed secure container.
Here is the core logic for the HttpClientFactory
on the Android client, written in Kotlin. It generates a key pair within the Android Keystore if one doesn’t exist, creates a self-signed certificate (in a production scenario, this would be a Certificate Signing Request sent to a proper CA), and configures the OkHttpClient
to use it.
// Android Client: HttpClientFactory.kt
import android.content.Context
import android.security.keystore.KeyGenParameterSpec
import android.security.keystore.KeyProperties
import okhttp3.OkHttpClient
import java.io.InputStream
import java.security.KeyStore
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManagerFactory
import javax.net.ssl.X509TrustManager
object HttpClientFactory {
private const val ANDROID_KEYSTORE = "AndroidKeyStore"
private const val KEY_ALIAS = "my-client-mtls-key"
private const val CA_CERT_RESOURCE_ID = R.raw.ca_cert // The server's CA cert bundled in the app
fun createMtlsClient(context: Context): OkHttpClient {
try {
// Step 1: Initialize the Android Keystore
val keyStore = KeyStore.getInstance(ANDROID_KEYSTORE).apply { load(null) }
// Step 2: Ensure client key pair and certificate exist
// A common mistake is to regenerate this on every app start.
// Check for existence first. In a real system, this would involve a secure
// enrollment process with a corporate CA.
if (!keyStore.containsAlias(KEY_ALIAS)) {
generateAndStoreKeyPair(keyStore)
}
// Step 3: Set up the KeyManagerFactory to use our client key from the Keystore
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()).apply {
init(keyStore, null) // Password can be null for Android Keystore
}
// Step 4: Set up the TrustManagerFactory to trust our server's Certificate Authority
// This prevents MITM attacks by ensuring we only talk to servers signed by our CA.
val certificateFactory = CertificateFactory.getInstance("X.509")
val caInputStream: InputStream = context.resources.openRawResource(CA_CERT_RESOURCE_ID)
val caCert = certificateFactory.generateCertificate(caInputStream) as X509Certificate
caInputStream.close()
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType()).apply {
load(null, null)
setCertificateEntry("ca", caCert)
}
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()).apply {
init(trustStore)
}
val trustManagers = trustManagerFactory.trustManagers
if (trustManagers.size != 1 || trustManagers[0] !is X509TrustManager) {
throw IllegalStateException("Unexpected default trust managers.")
}
// Step 5: Create an SSLContext that uses our KeyManager and TrustManager
val sslContext = SSLContext.getInstance("TLSv1.3").apply {
init(keyManagerFactory.keyManagers, trustManagers, null)
}
// Step 6: Build the OkHttpClient
return OkHttpClient.Builder()
.sslSocketFactory(sslContext.socketFactory, trustManagers[0] as X509TrustManager)
.build()
} catch (e: Exception) {
// In a production app, log this failure to a local-only diagnostics buffer.
// A failure here means the app cannot communicate securely with the backend.
throw RuntimeException("Failed to initialize mTLS client", e)
}
}
private fun generateAndStoreKeyPair(keyStore: KeyStore) {
val keyPairGenerator = java.security.KeyPairGenerator.getInstance(
KeyProperties.KEY_ALGORITHM_RSA,
ANDROID_KEYSTORE
)
val parameterSpec = KeyGenParameterSpec.Builder(
KEY_ALIAS,
KeyProperties.PURPOSE_SIGN or KeyProperties.PURPOSE_VERIFY
).run {
setKeySize(2048)
// Additional security parameters can be set here
// e.g., setUserAuthenticationRequired(true)
build()
}
keyPairGenerator.initialize(parameterSpec)
keyPairGenerator.generateKeyPair()
// The key pair is now securely stored in the hardware-backed keystore.
// A subsequent step, not shown for brevity, would be to generate a CSR from the public key,
// send it to a CA, and store the returned certificate chain.
}
}
Next, for the ingestion layer, we discarded ELK in favor of Grafana Loki. The rationale was purely economic and operational. Most of our telemetry data is for recent-event debugging (“what happened on device XYZ in the last hour?”). ELK’s full-text indexing of every field is overkill and represents the bulk of its cost. Loki’s design, which only indexes a small set of labels (like device_id
, app_version
, region
) and treats the log message as an unindexed string, radically reduces storage and compute costs. The trade-off is that full-text search is slower, but for our primary use case, it’s a perfectly acceptable one.
We built a lightweight log-forwarding service in Go that sits behind a load balancer. This service terminates the mTLS connection, validates the client certificate against our device registry, and then forwards the log payload to the Loki distributor cluster.
// Ingestion Service: main.go
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
)
// This would be a lookup against a database in a real system.
var validDeviceSerials = map[string]bool{"DEVICE-SERIAL-12345": true}
// LogPayload matches the structure sent from the Android client
type LogPayload struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Attrs map[string]string `json:"attrs"`
}
// LokiPushRequest is the structure Loki expects
type LokiPushRequest struct {
Streams []LokiStream `json:"streams"`
}
type LokiStream struct {
Stream map[string]string `json:"stream"` // Labels
Values [][]string `json:"values"` // [ [timestamp, log line] ]
}
func main() {
caCert, err := ioutil.ReadFile("ca.crt")
if err != nil {
log.Fatalf("Failed to read CA certificate: %v", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert, // Enforce mTLS
}
server := &http.Server{
Addr: ":8443",
TLSConfig: tlsConfig,
Handler: http.HandlerFunc(logHandler),
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Println("Starting mTLS log ingestion server on :8443")
// Paths to server cert and key
err = server.ListenAndServeTLS("server.crt", "server.key")
if err != nil {
log.Fatalf("Server failed to start: %v", err)
}
}
func logHandler(w http.ResponseWriter, r *http.Request) {
// The core of zero-trust: verify the client's identity from the certificate.
// A common mistake is to only check that *a* valid cert was presented.
// We must check *which* cert was presented.
if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 {
http.Error(w, "Client certificate required", http.StatusUnauthorized)
return
}
clientCert := r.TLS.PeerCertificates[0]
// Here, we use the certificate's Serial Number for identification.
// The Common Name (CN) is also a frequent choice.
deviceID := clientCert.SerialNumber.String()
if _, ok := validDeviceSerials[deviceID]; !ok {
log.Printf("Rejected connection from unauthorized device serial: %s", deviceID)
http.Error(w, "Unauthorized device", http.StatusForbidden)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
defer r.Body.Close()
// In a real system, you'd batch these requests to Loki.
// This simplified example forwards one-to-one.
err = forwardToLoki(body, deviceID)
if err != nil {
log.Printf("Failed to forward log to Loki for device %s: %v", deviceID, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func forwardToLoki(payload []byte, deviceID string) error {
var clientLog LogPayload
if err := json.Unmarshal(payload, &clientLog); err != nil {
return fmt.Errorf("could not unmarshal client payload: %w", err)
}
// Crucial: define Loki labels. Keep cardinality low.
// App version, region, and level are good candidates. Device ID is not.
lokiLabels := map[string]string{
"job": "android-telemetry",
"level": clientLog.Level,
"app_version": clientLog.Attrs["appVersion"],
}
// We pass deviceID as a structured field in the log line, not a label.
logLineContent, _ := json.Marshal(map[string]interface{}{
"device_id": deviceID,
"message": clientLog.Message,
"attrs": clientLog.Attrs,
})
lokiRequest := LokiPushRequest{
Streams: []LokiStream{
{
Stream: lokiLabels,
Values: [][]string{
{
// Loki expects nanosecond precision timestamp as a string.
fmt.Sprintf("%d", time.Now().UnixNano()),
string(logLineContent),
},
},
},
},
}
// Logic to send lokiRequest to Loki endpoint would go here...
return nil
}
This solved the ingestion problem. But Loki, by design, is not an analytical database. Answering questions like “what is the daily average crash rate for users on version 3.2 in Germany over the last six months?” is inefficient. We needed to get this data into our data lake. This is where Apache Iceberg came in. Iceberg provides a transactional table format over object storage (like S3), bringing ACID compliance, schema evolution, and time-travel capabilities to our raw data.
We designed an hourly batch ETL job using Apache Spark. The job queries Loki for the last hour’s worth of logs, parses the JSON log lines into a structured DataFrame, and appends it to an Iceberg table partitioned by date and event type. Iceberg’s atomic commits ensure that downstream consumers either see a full hour of data or none at all, preventing partial data reads if the job fails.
// Spark ETL Job: LokiToIceberg.scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SaveMode, SparkSession}
object LokiToIceberg {
def main(args: Array[String]): Unit = {
// Production Spark jobs require robust configuration management, not hardcoding.
val spark = SparkSession.builder
.appName("LokiToIcebergETL")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://my-data-lakehouse/warehouse")
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.getOrCreate()
import spark.implicits._
// In a real job, start/end times would be parameters, with state management
// to handle retries and prevent data duplication.
val endTime = System.currentTimeMillis() / 1000
val startTime = endTime - 3600 // One hour ago
// Step 1: Query Loki API.
// The Loki API can be queried with LogQL. This is a simplified representation.
// A production-grade connector would handle pagination and retries.
val lokiQuery = s"""{job="android-telemetry"}"""
val rawJsonDF = spark.read.format("loki") // Assuming a custom Loki Spark data source
.option("loki.endpoint", "http://loki-gateway:3100")
.option("loki.query", lokiQuery)
.option("loki.startTime", startTime.toString)
.option("loki.endTime", endTime.toString)
.load()
// Step 2: Parse the structured log line.
// The pitfall here is assuming the schema is static. The `from_json` function with
// a defined schema helps enforce structure and handle malformed data.
val logSchema = StructType(Seq(
StructField("device_id", StringType, nullable = true),
StructField("message", StringType, nullable = true),
StructField("attrs", MapType(StringType, StringType), nullable = true)
))
val parsedDF = rawJsonDF
.withColumn("log_data", from_json($"line", logSchema))
.select(
from_unixtime($"timestamp" / 1000000000, "yyyy-MM-dd HH:mm:ss.SSS").as("event_timestamp").cast(TimestampType),
$"labels.level".as("level"),
$"labels.app_version".as("app_version"),
$"log_data.device_id".as("device_id"),
$"log_data.message".as("message"),
$"log_data.attrs".getItem("eventType").as("event_type") // Extract a key attribute for partitioning
)
.withColumn("event_date", to_date($"event_timestamp"))
// Step 3: Write to Iceberg table.
// The table must be pre-created with partitioning specs.
// CREATE TABLE glue_catalog.telemetry.mobile_events (...)
// PARTITIONED BY (days(event_timestamp), event_type);
// Using SaveMode.Append and Iceberg's transactional nature makes this operation idempotent.
parsedDF.write
.format("iceberg")
.mode(SaveMode.Append)
.save("glue_catalog.telemetry.mobile_events")
spark.stop()
}
}
Finally, we needed a unified interface for our SREs and analysts. They needed to query Loki for fresh, real-time data and simultaneously run complex analytical queries against the historical data in Iceberg. We built an internal dashboard using Qwik. The choice of Qwik was deliberate. Traditional frameworks would require shipping large JavaScript bundles to the browser, leading to slow startup times, especially on a page displaying complex data tables and visualizations. Qwik’s resumability allows the server to render the initial state and then “pauses” execution. The browser downloads minimal JS and instantly displays the UI. Code is only downloaded and executed when the user interacts with a specific component. This was a game-changer for our data-heavy dashboard.
The architecture for the frontend was a Qwik app communicating with a Backend-for-Frontend (BFF). This BFF exposed two main endpoints: /api/loki/query
which proxied requests to Loki, and /api/iceberg/query
which used Trino’s REST API to query the Iceberg table.
graph TD A[SRE/Analyst Browser] -- HTTP Requests --> B{Qwik App on Node.js Server}; B -- Renders HTML + Q-JSON --> A; A -- User Interaction (e.g., change time range) --> C{Qwik Resumes}; C -- API Calls --> D[BFF Service]; D -- /api/loki/query --> E[Loki Gateway]; D -- /api/iceberg/query --> F[Trino Coordinator]; F -- Queries --> G[Apache Iceberg Table on S3];
Here’s a conceptual Qwik component demonstrating how it would fetch data from both sources.
// Qwik Component: UnifiedQueryView.tsx
import { component$, useResource$, Resource } from '@builder.io/qwik';
interface LokiLog {
timestamp: string;
line: string;
}
interface IcebergRow {
event_date: string;
crash_count: number;
}
// In a real app, these functions would be in a separate API service file.
async function fetchRecentLogs(query: string): Promise<LokiLog[]> {
const res = await fetch(`/api/loki/query?q=${encodeURIComponent(query)}`);
return res.json();
}
async function fetchAnalytics(sql: string): Promise<IcebergRow[]> {
const res = await fetch(`/api/iceberg/query`, {
method: 'POST',
body: JSON.stringify({ sql }),
headers: { 'Content-Type': 'application/json' }
});
return res.json();
}
export const UnifiedQueryView = component$(() => {
// useResource$ transparently handles fetching, state management, and streaming.
// The coolest part is that multiple resources can be fetched in parallel.
const recentLogsResource = useResource$<LokiLog[]>(({ cleanup }) => {
// This runs on the server for initial render, and on the client for subsequent updates.
const controller = new AbortController();
cleanup(() => controller.abort());
return fetchRecentLogs('{job="android-telemetry"} | limit 100');
});
const analyticsResource = useResource$<IcebergRow[]>(({ cleanup }) => {
const controller = new AbortController();
cleanup(() => controller.abort());
const sql = `
SELECT event_date, COUNT(*) as crash_count
FROM glue_catalog.telemetry.mobile_events
WHERE event_type = 'APP_CRASH' AND event_date > CURRENT_DATE - INTERVAL '30' DAY
GROUP BY event_date
ORDER BY event_date DESC
`;
return fetchAnalytics(sql);
});
return (
<div>
<h2>Recent Logs (from Loki)</h2>
<Resource
value={recentLogsResource}
onPending={() => <div>Loading recent logs...</div>}
onRejected={(error) => <div>Error: {error.message}</div>}
onResolved={(logs) => (
<pre>
{logs.map(log => `${log.timestamp}: ${log.line}`).join('\n')}
</pre>
)}
/>
<h2>Crash Analytics (from Iceberg + Trino)</h2>
<Resource
value={analyticsResource}
onPending={() => <div>Loading analytics...</div>}
onRejected={(error) => <div>Error: {error.message}</div>}
onResolved={(rows) => (
<table>
<thead><tr><th>Date</th><th>Crashes</th></tr></thead>
<tbody>
{rows.map(row => (
<tr><td>{row.event_date}</td><td>{row.crash_count}</td></tr>
))}
</tbody>
</table>
)}
/>
</div>
);
});
This multi-tiered system represents a significant leap in security, scalability, and capability over our previous architecture. It is not without its own complexities, however. The biggest operational burden has shifted from managing an expensive indexing cluster to managing a distributed certificate lifecycle for millions of mobile devices; secure enrollment and automated rotation are non-trivial engineering problems that we are now tackling. The ETL process from Loki to Iceberg is currently batch-oriented, introducing an hour of latency for analytics. While acceptable for now, future requirements for real-time dashboarding may force a move to a streaming architecture with something like Apache Flink. Finally, the cost model has changed: we now pay less for storage and indexing but more for the compute required by the Spark ETL jobs and Trino queries, which requires careful monitoring and optimization. The current architecture is a deliberate trade-off, prioritizing security and long-term data accessibility over real-time analytical latency.