The core challenge was straightforward in its definition, but complex in its constraints: create a real-time view of shared operational state, accessible simultaneously by a web-based dashboard and a native Android application for field agents. The non-negotiable constraint was that the Android client must remain fully functional during periods of intermittent or non-existent network connectivity. Simple API polling was immediately discarded due to latency and inefficiency. A push-based architecture was the only viable path, but this introduced the problem of data consistency and offline synchronization.
Our existing system of record was a transactional PostgreSQL database. Polluting the application code that mutated this database with event publishing logic was not an option; it’s a direct path to tightly coupled, unmaintainable services. This led us to Change Data Capture (CDC). By treating the database’s transaction log as a stream of events, we could decouple our real-time notification system from the core business logic entirely. This was the foundational decision.
From there, the stack began to take shape. Debezium, combined with Kafka, provided the robust, industrial-strength CDC pipeline. For the clients, Svelte was chosen for the web due to its minimal footprint and reactive nature, while Jetpack Compose was the clear choice for a modern, declarative Android UI. The critical offline capability on the web would be handled by a Service Worker managing a local IndexedDB cache. For the native client, a similar pattern would be implemented using Room. A lightweight middleware layer was required to consume the raw CDC events from Kafka, transform them, and broadcast them over WebSockets to clients. We chose Ktor for this, leveraging Kotlin’s concurrency features and maintaining language consistency with the Android project. Finally, to manage the inherent complexity of building and testing this heterogeneous stack, a unified CI/CD pipeline in GitHub Actions was essential from day one.
The Data Backbone: PostgreSQL, Debezium, and Kafka
The reliability of the entire system hinges on the CDC pipeline. Any data loss here cascades to all clients. A docker-compose.yml
file defines this core infrastructure, ensuring a reproducible development and testing environment. In a real-world project, these would be managed services, but the configuration principles remain the same.
# docker-compose.yml
version: '3.8'
services:
postgres:
image: debezium/postgres:14
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=inventory_db
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
image: confluentinc/cp-kafka:7.3.0
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
connect:
image: debezium/connect:2.1
ports:
- "8083:8083"
depends_on:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=debezium_connect_configs
- OFFSET_STORAGE_TOPIC=debezium_connect_offsets
- STATUS_STORAGE_TOPIC=debezium_connect_status
The init.sql
script sets up the target table and, crucially, enables logical replication.
-- init.sql
CREATE TABLE inventory_items (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
quantity INT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- For logical decoding
ALTER SYSTEM SET wal_level = 'logical';
-- Seed initial data
INSERT INTO inventory_items (name, quantity) VALUES ('Sensor Model A', 100);
INSERT INTO inventory_items (name, quantity) VALUES ('Actuator Model B', 50);
With the infrastructure running, the Debezium connector is registered via a curl
command to its REST API. A common mistake is to misconfigure the connector, leading to silent failures. Every parameter here is critical.
# register-connector.sh
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory_db",
"database.server.name": "dbserver1",
"table.include.list": "public.inventory_items",
"topic.prefix": "dbserver1",
"decimal.handling.mode": "double",
"snapshot.mode": "initial"
}
}'
The snapshot.mode: "initial"
ensures that Debezium performs an initial consistent snapshot of the inventory_items
table before streaming live changes. When an update occurs, like UPDATE inventory_items SET quantity = 49 WHERE id = 2;
, a detailed JSON message is published to the Kafka topic dbserver1.public.inventory_items
. The payload contains before
and after
fields, providing the complete context of the change, which is essential for downstream consumers.
The Middleware Bridge: Ktor, Kafka, and WebSockets
This service has one job: consume the verbose Debezium events, transform them into a lean, client-friendly format, and broadcast them to all connected clients. Using Ktor allows us to leverage Kotlin’s coroutines for handling concurrent WebSocket connections and the Kafka consumer loop efficiently.
// build.gradle.kts dependencies
dependencies {
// Ktor Core
implementation("io.ktor:ktor-server-core-jvm:$ktorVersion")
implementation("io.ktor:ktor-server-netty-jvm:$ktorVersion")
implementation("io.ktor:ktor-server-websockets-jvm:$ktorVersion")
// Kafka Client
implementation("org.apache.kafka:kafka-clients:3.4.0")
// JSON Serialization
implementation("io.ktor:ktor-serialization-kotlinx-json-jvm:$ktorVersion")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.0")
// Logging
implementation("ch.qos.logback:logback-classic:$logbackVersion")
}
The server setup involves installing the WebSockets
and ContentNegotiation
plugins. The core logic resides in a coroutine that runs for the application’s lifetime, consuming from Kafka and managing WebSocket sessions.
// Application.kt
package com.example.middleware
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import kotlinx.coroutines.launch
import java.time.Duration
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
fun main() {
embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = Application::module).start(wait = true)
}
fun Application.module() {
install(WebSockets) {
pingPeriod = Duration.ofSeconds(15)
timeout = Duration.ofSeconds(15)
maxFrameSize = Long.MAX_VALUE
masking = false
}
install(ContentNegotiation) {
json()
}
val connections = Collections.newSetFromMap(ConcurrentHashMap<WebSocketServerSession, Boolean>())
// Launch the Kafka consumer as a background job
launch {
KafkaConsumerService(connections).consume()
}
routing {
webSocket("/ws/inventory") {
connections += this
log.info("Client connected. Total clients: ${connections.size}")
try {
// Keep connection open, but we don't expect client messages here
for (frame in incoming) {
// No-op, we only push from server
}
} catch (e: Exception) {
log.error("Error with WebSocket client: ${e.localizedMessage}")
} finally {
connections -= this
log.info("Client disconnected. Total clients: ${connections.size}")
}
}
}
}
The KafkaConsumerService
is where the real work happens. It connects to the Kafka broker, subscribes to the topic, and enters an infinite polling loop. The pitfall here is error handling; if the consumer fails, it must log the error and attempt to continue or restart gracefully, rather than crashing the entire service.
// KafkaConsumerService.kt
package com.example.middleware
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.serialization.json.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*
// Simplified data classes for client payload
@kotlinx.serialization.Serializable
data class ItemPayload(val id: Int, val name: String, val quantity: Int)
@kotlinx.serialization.Serializable
data class UpdateAction(val type: String, val payload: ItemPayload)
class KafkaConsumerService(private val connections: Set<WebSocketServerSession>) {
private val logger = LoggerFactory.getLogger(javaClass)
private val consumer: KafkaConsumer<String, String>
init {
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-middleware-group")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}
consumer = KafkaConsumer(props)
}
suspend fun consume() {
consumer.subscribe(listOf("dbserver1.public.inventory_items"))
logger.info("Kafka consumer subscribed to topic.")
withContext(Dispatchers.IO) { // Run polling in a dedicated IO thread pool
while (true) {
try {
val records = consumer.poll(Duration.ofMillis(1000))
for (record in records) {
logger.info("Received record from Kafka: ${record.value()}")
val action = transformDebeziumMessage(record.value())
if (action != null) {
val actionJson = Json.encodeToString(UpdateAction.serializer(), action)
// Broadcast to all connected WebSocket clients
connections.forEach { session ->
// Launch in a separate coroutine to avoid one bad client blocking others
session.launch {
session.send(Frame.Text(actionJson))
}
}
}
}
} catch (e: Exception) {
logger.error("Error during Kafka poll/processing", e)
// In a production system, implement a backoff-retry strategy
kotlinx.coroutines.delay(5000)
}
}
}
}
private fun transformDebeziumMessage(message: String): UpdateAction? {
return try {
val jsonObject = Json.parseToJsonElement(message).jsonObject
val payload = jsonObject["payload"]?.jsonObject ?: return null // Tombstone message
val after = payload["after"]?.jsonObject ?: return null
val item = ItemPayload(
id = after["id"]!!.jsonPrimitive.int,
name = after["name"]!!.jsonPrimitive.content,
quantity = after["quantity"]!!.jsonPrimitive.int
)
// Debezium op 'c' for create, 'u' for update. Both are treated as an upsert on the client.
UpdateAction("UPSERT_ITEM", item)
} catch (e: Exception) {
logger.error("Failed to parse Debezium message: $message", e)
null
}
}
}
Finally, the Dockerfile
for this service is standard for a JVM application.
FROM openjdk:17-jdk-slim
WORKDIR /app
COPY build/libs/middleware-0.0.1-all.jar app.jar
CMD ["java", "-jar", "app.jar"]
The Web Client: Svelte and the Service Worker Brain
The Svelte client architecture is designed around the principle that the UI always reads from a local data source (IndexedDB) and never directly from the network. The network’s role is simply to update that local source. This decouples the user experience from network reliability. The Service Worker is the orchestrator of this logic.
// src/service-worker.js
import { build, files, version } from '$service-worker';
const CACHE_NAME = `static-cache-${version}`;
const STATIC_ASSETS = build.concat(files);
self.addEventListener('install', (event) => {
event.waitUntil(
caches.open(CACHE_NAME).then((cache) => {
// Pre-cache the application shell
return cache.addAll(STATIC_ASSETS);
})
);
});
self.addEventListener('activate', (event) => {
event.waitUntil(
caches.keys().then((keys) => {
return Promise.all(
keys.filter((key) => key !== CACHE_NAME).map((key) => caches.delete(key))
);
})
);
});
self.addEventListener('fetch', (event) => {
// A simple cache-first strategy. A real app might need more complex logic.
event.respondWith(
caches.match(event.request).then((response) => {
return response || fetch(event.request);
})
);
});
// The message listener is the bridge between the app and the worker
self.addEventListener('message', (event) => {
if (event.data && event.data.type === 'WEBSOCKET_MESSAGE_RECEIVED') {
const action = event.data.payload;
if (action.type === 'UPSERT_ITEM') {
// In a real project, this would be an IndexedDB operation.
// For simplicity here, we'll just log it.
console.log('[SW] Received instruction to upsert item:', action.payload);
// db.items.put(action.payload); -> This would be the IndexedDB call
}
}
});
The application’s data layer consists of a Svelte store and a WebSocket service. When the WebSocket service receives a message, it doesn’t update the store directly. Instead, it dispatches a message to the Service Worker, delegating the responsibility of data persistence.
// src/lib/data/websocket.service.js
import { writable } from 'svelte/store';
export const connectionStatus = writable('disconnected');
let socket;
// Ensure this code only runs in the browser
const isBrowser = typeof window !== 'undefined';
function connect() {
if (!isBrowser) return;
socket = new WebSocket('ws://localhost:8080/ws/inventory');
socket.onopen = () => {
console.log('WebSocket connection established.');
connectionStatus.set('connected');
};
socket.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
// Crucially, delegate state change to the Service Worker
if (navigator.serviceWorker && navigator.serviceWorker.controller) {
navigator.serviceWorker.controller.postMessage({
type: 'WEBSOCKET_MESSAGE_RECEIVED',
payload: data,
});
}
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
};
socket.onclose = () => {
console.log('WebSocket connection closed. Attempting to reconnect...');
connectionStatus.set('disconnected');
// A robust implementation would use an exponential backoff strategy
setTimeout(connect, 5000);
};
socket.onerror = (error) => {
console.error('WebSocket error:', error);
socket.close();
};
}
// Initial connection attempt
connect();
The Svelte UI component then subscribes to a store that is populated from IndexedDB, ensuring it remains reactive to changes in the local database, regardless of the network’s status.
The Native Client: Jetpack Compose and Room
The Android client follows the exact same architectural pattern, but with native tooling. Room serves as the local database, Ktor Client handles the WebSocket connection, and a ViewModel
exposes the data from Room as a StateFlow
for the Compose UI to observe.
// build.gradle.kts (Module level)
dependencies {
// Jetpack Compose
implementation("androidx.compose.ui:ui:$compose_version")
// ... other compose dependencies
// Room for local persistence
implementation("androidx.room:room-runtime:$room_version")
kapt("androidx.room:room-compiler:$room_version")
implementation("androidx.room:room-ktx:$room_version")
// Ktor client for WebSockets
implementation("io.ktor:ktor-client-core:$ktor_version")
implementation("io.ktor:ktor-client-cio:$ktor_version")
implementation("io.ktor:ktor-client-websockets:$ktor_version")
implementation("io.ktor:ktor-client-content-negotiation:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
// ViewModel & LiveData
implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version")
implementation("androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version")
}
The data layer consists of a Room Entity
and Dao
, and a Repository
that orchestrates network and database operations.
// InventoryItem.kt (Room Entity)
@Entity(tableName = "inventory_items")
data class InventoryItem(
@PrimaryKey val id: Int,
val name: String,
val quantity: Int
)
// InventoryDao.kt
@Dao
interface InventoryDao {
@Query("SELECT * FROM inventory_items ORDER BY name ASC")
fun getAll(): Flow<List<InventoryItem>>
@Upsert
suspend fun upsert(item: InventoryItem)
}
The InventoryRepository
contains the logic to connect to the WebSocket and update the local database. A common mistake is to perform database operations on the main thread; all Room calls here are suspend
functions, ensuring they run on a background thread.
// InventoryRepository.kt
class InventoryRepository(private val dao: InventoryDao) {
val allItems: Flow<List<InventoryItem>> = dao.getAll()
private val client = HttpClient(CIO) {
install(WebSockets)
install(ContentNegotiation) { json(Json) }
}
suspend fun connectAndListen() = withContext(Dispatchers.IO) {
try {
client.webSocket(method = HttpMethod.Get, host = "10.0.2.2", port = 8080, path = "/ws/inventory") {
for (frame in incoming) {
if (frame is Frame.Text) {
val text = frame.readText()
val action = Json.decodeFromString<UpdateAction>(text)
// Update the local database. The UI will react automatically.
if (action.type == "UPSERT_ITEM") {
val item = InventoryItem(
id = action.payload.id,
name = action.payload.name,
quantity = action.payload.quantity
)
dao.upsert(item)
}
}
}
}
} catch (e: Exception) {
// Handle connection errors, schedule reconnection etc.
Log.e("InventoryRepository", "WebSocket connection failed", e)
}
}
}
The ViewModel
simply exposes the repository’s data stream, and the Composable UI collects this stream as state.
// InventoryViewModel.kt
class InventoryViewModel(application: Application) : AndroidViewModel(application) {
private val repository: InventoryRepository
val items: StateFlow<List<InventoryItem>>
init {
val db = InventoryDatabase.getDatabase(application)
repository = InventoryRepository(db.inventoryDao())
items = repository.allItems.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
// Start listening in the background
viewModelScope.launch {
repository.connectAndListen()
}
}
}
// InventoryScreen.kt (Composable)
@Composable
fun InventoryScreen(viewModel: InventoryViewModel) {
val items by viewModel.items.collectAsState()
LazyColumn {
items(items) { item ->
Text(text = "${item.name}: ${item.quantity}")
}
}
}
Automation: The GitHub Actions Pipeline
Managing the build, test, and deployment of three distinct artifacts (a Docker image, a static website, and an Android APK) manually is untenable. A GitHub Actions workflow automates this entire process.
# .github/workflows/ci-cd.yml
name: Build and Deploy Full Stack
on:
push:
branches: [ main ]
jobs:
build-middleware:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- name: Build with Gradle
working-directory: ./middleware
run: ./gradlew build shadowJar
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Log in to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v4
with:
context: ./middleware
push: true
tags: ghcr.io/${{ github.repository }}/inventory-middleware:latest
build-web-client:
runs-on: ubuntu-latest
needs: build-middleware
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Node.js
uses: actions/setup-node@v3
with:
node-version: '18'
- name: Install dependencies and build
working-directory: ./svelte-client
run: |
npm install
npm run build
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: svelte-dist
path: ./svelte-client/build/
build-android-client:
runs-on: ubuntu-latest
needs: build-middleware
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- name: Grant execute permission for gradlew
working-directory: ./android-client
run: chmod +x gradlew
- name: Build with Gradle
working-directory: ./android-client
run: ./gradlew assembleRelease
- name: Upload APK
uses: actions/upload-artifact@v3
with:
name: android-apk
path: ./android-client/app/build/outputs/apk/release/app-release-unsigned.apk
This workflow defines three parallelizable jobs. A real-world pipeline would include comprehensive unit and integration test stages, secret management for signing keys, and automated deployment steps to environments like Kubernetes and a static web host. The key principle is creating a single, automated path from code commit to deployable artifact for every component of the system.
The current design presents several limitations that would need to be addressed in a production deployment. The middleware broadcasts every single change to every connected client, which is both inefficient and insecure. A topic-based subscription model (/ws/inventory/region-A
) would be necessary. The offline conflict resolution is non-existent; if two clients modify the same item while offline, the last one to reconnect wins. Implementing a more robust strategy, potentially using CRDTs or vector clocks, would be a significant but necessary evolution. Furthermore, the CDC pipeline is sensitive to database schema changes. A formal schema evolution strategy, including versioning of the WebSocket API payloads, would be critical for long-term maintainability.