The core technical problem is managing data from a fleet of several million industrial sensors, each emitting telemetry every few seconds. This generates a daily ingest volume measured in terabytes. The system must satisfy two fundamentally conflicting requirements: first, operational teams need interactive, sub-second query latency on dashboards displaying data from the last 30 days for real-time monitoring and alerting. Second, data science and engineering teams require access to a multi-year, petabyte-scale historical archive for trend analysis, anomaly detection model training, and compliance reporting.
A single data storage solution is operationally untenable. The choice of system becomes a direct trade-off between query performance, storage cost, and architectural complexity. Evaluating a monolithic approach reveals immediate and significant flaws.
Using only a time-series database like TimescaleDB presents a compelling solution for the real-time requirement. Its hypertable abstraction, continuous aggregates, and SQL interface are purpose-built for slicing, dicing, and aggregating recent time-stamped data. Performance for queries scoped to recent time chunks is excellent. However, scaling this to petabytes of historical data is financially prohibitive. The storage costs for high-performance SSDs required for a database of this nature, coupled with the ever-growing index sizes that degrade query performance over long time ranges, make it an impractical choice for the archival requirement. In a real-world project, a multi-petabyte TimescaleDB cluster would be a budget-killer before it even entered production.
Conversely, relying solely on a system like Apache HBase addresses the archival storage problem effectively. HBase is designed for massive, sparse datasets and runs economically on commodity hardware. Its horizontal scalability is proven. The primary pitfall here is its query model. HBase’s key-value nature, accessed via row key scans, is poorly suited for the complex, time-bucketed aggregate queries needed for real-time dashboards. While tools like Apache Phoenix can provide a SQL layer, the performance for the kind of window functions and joins common in operational monitoring does not compete with a native time-series database. The operational overhead of maintaining a large HBase cluster is also non-trivial.
The only pragmatic path forward is a tiered, polyglot persistence architecture. This design explicitly separates the “hot” and “cold” data tiers, leveraging the strengths of two different database systems.
- Hot Tier: TimescaleDB is used to store the most recent 30 days of data. All incoming data is written here first. This tier is optimized for high-ingest rates and low-latency queries on recent data, running on faster, more expensive storage.
- Cold Tier: HBase is used to store all data older than 30 days. This tier is optimized for low-cost, massive-scale storage.
- Data Tiering Service: A custom background service is responsible for periodically migrating data from the hot tier to the cold tier.
- Unified Query Service: A crucial abstraction layer that exposes a single API endpoint to clients. It directs queries to the appropriate database—or both—based on the requested time range.
This architecture is not without its own complexities, primarily in the data migration process and the query federation logic. However, it provides the only economically and performantly viable solution to the stated problem.
graph TD subgraph Data Sources A[Millions of IIoT Sensors] end subgraph Ingest Layer B(Kafka) C(Ingest Service) end subgraph Application Layer D(Unified Query Service) E(Data Tiering Service) end subgraph Hot Tier - Last 30 Days F[TimescaleDB Cluster] end subgraph Cold Tier - Historical Archive G[HBase Cluster on HDFS] end subgraph Client H(Jetpack Compose App) end A --> B B --> C C -- Writes --> F E -- Reads > 30 days --> F E -- Writes --> G E -- Deletes > 30 days --> F D -- Queries < 30 days --> F D -- Queries > 30 days --> G D -- Spanning queries --> F D -- Spanning queries --> G H -- API Calls --> D
The core implementation work is divided into four main areas: defining the database schemas, building the data tiering service, implementing the unified query service, and developing the client interface.
TimescaleDB Schema for the Hot Tier
The schema in TimescaleDB is straightforward. A single hypertable stores all sensor readings. Continuous aggregates are essential for powering the real-time dashboards efficiently, pre-calculating hourly and daily rollups.
-- Main table for raw sensor data
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
-- Additional metadata like location, unit, etc. can be added
location TEXT,
unit TEXT
);
-- Convert the table to a hypertable, partitioned by time
SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 day');
-- Create indexes for common query patterns
CREATE INDEX ON sensor_data (device_id, time DESC);
CREATE INDEX ON sensor_data (metric_name, time DESC);
-- Continuous aggregate for hourly averages, a key performance optimization
CREATE MATERIALIZED VIEW sensor_data_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
device_id,
metric_name,
AVG(value) as avg_value,
MAX(value) as max_value,
MIN(value) as min_value
FROM sensor_data
GROUP BY bucket, device_id, metric_name;
-- Policy to keep the continuous aggregate refreshed
SELECT add_continuous_aggregate_policy('sensor_data_hourly',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '15 minutes');
The chunk_time_interval
is set to one day, which is a reasonable starting point. This means data migration and deletion can operate on clean, day-sized chunks, which is highly efficient in TimescaleDB.
HBase Schema Design for the Cold Tier
Designing the HBase row key is the most critical aspect of its schema. A poorly designed key will lead to hotspotting and inefficient scans. For this use case, the query pattern on historical data is almost always by device_id
and a time range.
The chosen row key structure is: [2-byte salt][device_id][reversed_timestamp]
.
- Salt: A two-byte hash prefix of the
device_id
. This prevents hotspotting by distributing data for sequential device IDs across different RegionServers. - Device ID: The unique identifier for the sensor.
- Reversed Timestamp: A
Long.MAX_VALUE - timestamp.toEpochMilli()
. Storing the timestamp in reverse chronological order ensures that the most recent data for a given device is at the beginning of its block, enabling efficient “get latest N” queries via a scan with a small limit.
// Example of Row Key generation in Java/Kotlin
import org.apache.commons.codec.digest.DigestUtils;
import java.nio.ByteBuffer;
public class HBaseRowKeyGenerator {
// A simple salting mechanism to prevent hotspotting.
// In a production system, the number of buckets should be configurable.
private static final int SALT_BUCKETS = 256;
public static byte[] createRowKey(String deviceId, long timestampMillis) {
// 1. Salting prefix
byte[] deviceIdBytes = deviceId.getBytes();
byte salt = (byte) (Math.abs(murmurHash3(deviceIdBytes)) % SALT_BUCKETS);
// 2. Reversed timestamp
long reversedTimestamp = Long.MAX_VALUE - timestampMillis;
// 3. Assemble the key: [1 byte salt][variable deviceId][8 bytes reversed ts]
// Using a fixed size for deviceId would be more optimal if possible.
ByteBuffer buffer = ByteBuffer.allocate(1 + deviceIdBytes.length + 8);
buffer.put(salt);
buffer.put(deviceIdBytes);
buffer.putLong(reversedTimestamp);
return buffer.array();
}
// A simple non-cryptographic hash. Guava or other libraries have better implementations.
private static int murmurHash3(byte[] data) {
// Placeholder for a real hash implementation
return new String(data).hashCode();
}
}
The table would have a single column family, d
, containing qualifiers for each metric (value
, location
, unit
, etc.).
Data Tiering Service Implementation
This service is a standalone application, likely running on a cron schedule or managed by a workflow orchestrator like Airflow. Its logic is critical: it must be idempotent and fault-tolerant. A common mistake is to fail after writing to HBase but before deleting from TimescaleDB, leading to data duplication.
The process is:
- Determine the time threshold (e.g.,
now() - 30 days
). - Query TimescaleDB for all chunks older than this threshold.
- For each chunk, stream its data.
- Batch the data into
Put
operations for HBase. - Write the batches to HBase.
- Upon successful write confirmation from HBase, execute
drop_chunks
in TimescaleDB for that specific chunk. - Log everything meticulously.
// Simplified Kotlin implementation for the tiering logic
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.util.Bytes
import java.sql.DriverManager
import java.time.Instant
import java.time.temporal.ChronoUnit
class DataTieringService(
private val timescaleDbUrl: String,
private val hbaseConnection: Connection
) {
private val hbaseTableName = "sensor_data_archive"
private val batchSize = 5000 // Configurable batch size for HBase puts
fun runTieringProcess() {
// In production, use proper logging frameworks
println("Starting data tiering process at ${Instant.now()}")
// 1. Connect to TimescaleDB
DriverManager.getConnection(timescaleDbUrl).use { conn ->
val threshold = Instant.now().minus(30, ChronoUnit.DAYS)
// 2. Find chunks older than the threshold
val findChunksSql = """
SELECT chunk_schema, chunk_name
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data'
AND range_end < ?::timestamptz
""".trimIndent()
conn.prepareStatement(findChunksSql).use { stmt ->
stmt.setString(1, threshold.toString())
val chunksToMove = mutableListOf<String>()
val rs = stmt.executeQuery()
while (rs.next()) {
chunksToMove.add("${rs.getString("chunk_schema")}.${rs.getString("chunk_name")}")
}
// 3. Process each chunk
chunksToMove.forEach { chunkName ->
println("Processing chunk: $chunkName")
try {
processAndDropChunk(conn, chunkName)
} catch (e: Exception) {
// Critical error: This requires manual intervention or a robust retry mechanism
println("FATAL: Failed to process chunk $chunkName. Error: ${e.message}")
// Stop the process to avoid inconsistent state.
return
}
}
}
}
println("Data tiering process finished at ${Instant.now()}")
}
private fun processAndDropChunk(tsConn: java.sql.Connection, chunkName: String) {
val puts = mutableListOf<Put>()
// 4. Stream data from the chunk
val selectSql = "SELECT time, device_id, metric_name, value, location, unit FROM $chunkName"
tsConn.createStatement().use { stmt ->
val rs = stmt.executeQuery(selectSql)
hbaseConnection.getTable(org.apache.hadoop.hbase.TableName.valueOf(hbaseTableName)).use { hbaseTable ->
while (rs.next()) {
val time = rs.getTimestamp("time").toInstant().toEpochMilli()
val deviceId = rs.getString("device_id")
val rowKey = HBaseRowKeyGenerator.createRowKey(deviceId, time)
val put = Put(rowKey)
// Add columns to the 'd' column family
put.addColumn(Bytes.toBytes("d"), Bytes.toBytes("metric_name"), Bytes.toBytes(rs.getString("metric_name")))
put.addColumn(Bytes.toBytes("d"), Bytes.toBytes("value"), Bytes.toBytes(rs.getDouble("value")))
rs.getString("location")?.let { put.addColumn(Bytes.toBytes("d"), Bytes.toBytes("location"), Bytes.toBytes(it)) }
rs.getString("unit")?.let { put.addColumn(Bytes.toBytes("d"), Bytes.toBytes("unit"), Bytes.toBytes(it)) }
puts.add(put)
// 5. Write to HBase in batches
if (puts.size >= batchSize) {
hbaseTable.put(puts)
puts.clear()
println("Wrote batch of ${batchSize} records to HBase for chunk $chunkName")
}
}
// Write any remaining puts
if (puts.isNotEmpty()) {
hbaseTable.put(puts)
println("Wrote final batch of ${puts.size} records to HBase for chunk $chunkName")
}
}
}
// 6. On success, drop the chunk from TimescaleDB. This is the critical step.
// It should be wrapped in a transaction if possible, but the cross-system
// nature makes it tricky. A stateful tracking DB is a better production approach.
println("Successfully moved data for $chunkName to HBase. Dropping chunk from TimescaleDB.")
tsConn.createStatement().execute("SELECT drop_chunks('$chunkName')")
}
}
Unified Query Service
This service acts as the brain of the read path. It inspects the query’s time range and routes it accordingly. A common implementation pattern is to use a lightweight web framework like Ktor or Spring WebFlux to handle requests asynchronously.
// Example Ktor route handler for the unified query
// Assume timescaleRepo and hbaseRepo are injected dependencies
val hotTierBoundary = Duration.ofDays(30)
fun Route.queryRoutes() {
get("/data/{deviceId}") {
val deviceId = call.parameters["deviceId"]!!
val start = Instant.parse(call.request.queryParameters["start"]!!)
val end = Instant.parse(call.request.queryParameters["end"]!!)
val now = Instant.now()
val hotThreshold = now.minus(hotTierBoundary)
val results: List<SensorReading>
// Case 1: Query is entirely within the hot tier
if (start.isAfter(hotThreshold)) {
results = timescaleRepo.query(deviceId, start, end)
}
// Case 2: Query is entirely within the cold tier
else if (end.isBefore(hotThreshold)) {
results = hbaseRepo.query(deviceId, start, end)
}
// Case 3: Query spans both hot and cold tiers
else {
// Execute queries concurrently for better performance
val hotDataDeferred = async { timescaleRepo.query(deviceId, hotThreshold, end) }
val coldDataDeferred = async { hbaseRepo.query(deviceId, start, hotThreshold) }
results = hotDataDeferred.await() + coldDataDeferred.await()
// In a real-world scenario, you'd need to sort the merged results by timestamp
results.sortedBy { it.time }
}
call.respond(results)
}
}
The merge logic here is simplified. For aggregated queries, this step is far more complex and may involve performing aggregations within the service itself, which is a potential performance bottleneck.
Jetpack Compose Client
The Android client should remain blissfully unaware of the backend’s data-tiering complexity. Its ViewModel
interacts with the Unified Query Service through a simple repository pattern.
// data/SensorRepository.kt
class SensorRepository(private val apiService: SensorApiService) {
suspend fun getSensorData(
deviceId: String,
startTime: Instant,
endTime: Instant
): Result<List<SensorReading>> {
return try {
val data = apiService.fetchData(deviceId, startTime.toString(), endTime.toString())
Result.success(data)
} catch (e: Exception) {
// Proper error handling
Result.failure(e)
}
}
}
// ui/SensorViewModel.kt
class SensorViewModel(private val repository: SensorRepository) : ViewModel() {
private val _uiState = MutableStateFlow<SensorDataUiState>(SensorDataUiState.Idle)
val uiState: StateFlow<SensorDataUiState> = _uiState
fun loadData(deviceId: String, timeRange: TimeRange) {
viewModelScope.launch {
_uiState.value = SensorDataUiState.Loading
val now = Instant.now()
val start = when (timeRange) {
TimeRange.LAST_24_HOURS -> now.minus(1, ChronoUnit.DAYS)
TimeRange.LAST_7_DAYS -> now.minus(7, ChronoUnit.DAYS)
TimeRange.LAST_90_DAYS -> now.minus(90, ChronoUnit.DAYS)
}
val result = repository.getSensorData(deviceId, start, now)
_uiState.value = result.fold(
onSuccess = { SensorDataUiState.Success(it) },
onFailure = { SensorDataUiState.Error(it.message ?: "Unknown error") }
)
}
}
}
sealed class SensorDataUiState {
object Idle : SensorDataUiState()
object Loading : SensorDataUiState()
data class Success(val data: List<SensorReading>) : SensorDataUiState()
data class Error(val message: String) : SensorDataUiState()
}
The Composable function observes the uiState
and renders the appropriate view.
// ui/SensorScreen.kt
@Composable
fun SensorDataScreen(viewModel: SensorViewModel) {
val uiState by viewModel.uiState.collectAsState()
Column(modifier = Modifier.fillMaxSize().padding(16.dp)) {
// Buttons to select time range
Row {
Button(onClick = { viewModel.loadData("device-001", TimeRange.LAST_24_HOURS) }) { Text("24 Hours") }
Spacer(Modifier.width(8.dp))
Button(onClick = { viewModel.loadData("device-001", TimeRange.LAST_90_DAYS) }) { Text("90 Days") }
}
Spacer(Modifier.height(16.dp))
when (val state = uiState) {
is SensorDataUiState.Loading -> CircularProgressIndicator()
is SensorDataUiState.Success -> {
// In a real app, this would be a charting library.
// Here we just display the count of data points.
Text("Loaded ${state.data.size} data points.", style = MaterialTheme.typography.h6)
LazyColumn {
items(state.data) { reading ->
Text("${reading.time}: ${reading.value}")
}
}
}
is SensorDataUiState.Error -> Text("Error: ${state.message}", color = Color.Red)
is SensorDataUiState.Idle -> Text("Select a time range to load data.")
}
}
}
When the user taps “90 Days,” the ViewModel
makes a single API call. The backend service identifies that this range spans the hot/cold boundary, queries both TimescaleDB and HBase concurrently, merges the results, and returns a unified response. The client UI updates without ever knowing the underlying complexity.
This tiered architecture, while adding moving parts, is a pragmatic and necessary solution for balancing performance, cost, and functionality at a massive scale. Its primary limitation lies in the complexity of the query federation, especially for cross-tier aggregations which have not been addressed in this implementation. Future work could involve replacing the custom Unified Query Service with a more powerful federated query engine like Presto or Trino. However, that introduces another system to manage, and the trade-off between building a bespoke service versus managing a complex open-source project must be carefully weighed for the specific operational capabilities of the team. The current solution is a robust, direct approach that solves the immediate problem effectively.