Implementing a CDC-Powered Middleware for Synchronizing Offline-First Svelte and Jetpack Compose UIs


The core challenge was straightforward in its definition, but complex in its constraints: create a real-time view of shared operational state, accessible simultaneously by a web-based dashboard and a native Android application for field agents. The non-negotiable constraint was that the Android client must remain fully functional during periods of intermittent or non-existent network connectivity. Simple API polling was immediately discarded due to latency and inefficiency. A push-based architecture was the only viable path, but this introduced the problem of data consistency and offline synchronization.

Our existing system of record was a transactional PostgreSQL database. Polluting the application code that mutated this database with event publishing logic was not an option; it’s a direct path to tightly coupled, unmaintainable services. This led us to Change Data Capture (CDC). By treating the database’s transaction log as a stream of events, we could decouple our real-time notification system from the core business logic entirely. This was the foundational decision.

From there, the stack began to take shape. Debezium, combined with Kafka, provided the robust, industrial-strength CDC pipeline. For the clients, Svelte was chosen for the web due to its minimal footprint and reactive nature, while Jetpack Compose was the clear choice for a modern, declarative Android UI. The critical offline capability on the web would be handled by a Service Worker managing a local IndexedDB cache. For the native client, a similar pattern would be implemented using Room. A lightweight middleware layer was required to consume the raw CDC events from Kafka, transform them, and broadcast them over WebSockets to clients. We chose Ktor for this, leveraging Kotlin’s concurrency features and maintaining language consistency with the Android project. Finally, to manage the inherent complexity of building and testing this heterogeneous stack, a unified CI/CD pipeline in GitHub Actions was essential from day one.

The Data Backbone: PostgreSQL, Debezium, and Kafka

The reliability of the entire system hinges on the CDC pipeline. Any data loss here cascades to all clients. A docker-compose.yml file defines this core infrastructure, ensuring a reproducible development and testing environment. In a real-world project, these would be managed services, but the configuration principles remain the same.

# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: debezium/postgres:14
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=inventory_db
    volumes:
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    ports:
      - "2181:2181"
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
      - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
      - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1

  connect:
    image: debezium/connect:2.1
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=debezium_connect_configs
      - OFFSET_STORAGE_TOPIC=debezium_connect_offsets
      - STATUS_STORAGE_TOPIC=debezium_connect_status

The init.sql script sets up the target table and, crucially, enables logical replication.

-- init.sql
CREATE TABLE inventory_items (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    quantity INT NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- For logical decoding
ALTER SYSTEM SET wal_level = 'logical';

-- Seed initial data
INSERT INTO inventory_items (name, quantity) VALUES ('Sensor Model A', 100);
INSERT INTO inventory_items (name, quantity) VALUES ('Actuator Model B', 50);

With the infrastructure running, the Debezium connector is registered via a curl command to its REST API. A common mistake is to misconfigure the connector, leading to silent failures. Every parameter here is critical.

# register-connector.sh
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory_db",
    "database.server.name": "dbserver1",
    "table.include.list": "public.inventory_items",
    "topic.prefix": "dbserver1",
    "decimal.handling.mode": "double",
    "snapshot.mode": "initial"
  }
}'

The snapshot.mode: "initial" ensures that Debezium performs an initial consistent snapshot of the inventory_items table before streaming live changes. When an update occurs, like UPDATE inventory_items SET quantity = 49 WHERE id = 2;, a detailed JSON message is published to the Kafka topic dbserver1.public.inventory_items. The payload contains before and after fields, providing the complete context of the change, which is essential for downstream consumers.

The Middleware Bridge: Ktor, Kafka, and WebSockets

This service has one job: consume the verbose Debezium events, transform them into a lean, client-friendly format, and broadcast them to all connected clients. Using Ktor allows us to leverage Kotlin’s coroutines for handling concurrent WebSocket connections and the Kafka consumer loop efficiently.

// build.gradle.kts dependencies
dependencies {
    // Ktor Core
    implementation("io.ktor:ktor-server-core-jvm:$ktorVersion")
    implementation("io.ktor:ktor-server-netty-jvm:$ktorVersion")
    implementation("io.ktor:ktor-server-websockets-jvm:$ktorVersion")
    
    // Kafka Client
    implementation("org.apache.kafka:kafka-clients:3.4.0")

    // JSON Serialization
    implementation("io.ktor:ktor-serialization-kotlinx-json-jvm:$ktorVersion")
    implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.0")
    
    // Logging
    implementation("ch.qos.logback:logback-classic:$logbackVersion")
}

The server setup involves installing the WebSockets and ContentNegotiation plugins. The core logic resides in a coroutine that runs for the application’s lifetime, consuming from Kafka and managing WebSocket sessions.

// Application.kt
package com.example.middleware

import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import kotlinx.coroutines.launch
import java.time.Duration
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap

fun main() {
    embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = Application::module).start(wait = true)
}

fun Application.module() {
    install(WebSockets) {
        pingPeriod = Duration.ofSeconds(15)
        timeout = Duration.ofSeconds(15)
        maxFrameSize = Long.MAX_VALUE
        masking = false
    }
    install(ContentNegotiation) {
        json()
    }

    val connections = Collections.newSetFromMap(ConcurrentHashMap<WebSocketServerSession, Boolean>())
    
    // Launch the Kafka consumer as a background job
    launch {
        KafkaConsumerService(connections).consume()
    }

    routing {
        webSocket("/ws/inventory") {
            connections += this
            log.info("Client connected. Total clients: ${connections.size}")
            try {
                // Keep connection open, but we don't expect client messages here
                for (frame in incoming) { 
                    // No-op, we only push from server
                }
            } catch (e: Exception) {
                log.error("Error with WebSocket client: ${e.localizedMessage}")
            } finally {
                connections -= this
                log.info("Client disconnected. Total clients: ${connections.size}")
            }
        }
    }
}

The KafkaConsumerService is where the real work happens. It connects to the Kafka broker, subscribes to the topic, and enters an infinite polling loop. The pitfall here is error handling; if the consumer fails, it must log the error and attempt to continue or restart gracefully, rather than crashing the entire service.

// KafkaConsumerService.kt
package com.example.middleware

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.serialization.json.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*
import io.ktor.server.websocket.*
import io.ktor.websocket.*

// Simplified data classes for client payload
@kotlinx.serialization.Serializable
data class ItemPayload(val id: Int, val name: String, val quantity: Int)

@kotlinx.serialization.Serializable
data class UpdateAction(val type: String, val payload: ItemPayload)

class KafkaConsumerService(private val connections: Set<WebSocketServerSession>) {
    private val logger = LoggerFactory.getLogger(javaClass)
    private val consumer: KafkaConsumer<String, String>

    init {
        val props = Properties().apply {
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092")
            put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-middleware-group")
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
            put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        }
        consumer = KafkaConsumer(props)
    }

    suspend fun consume() {
        consumer.subscribe(listOf("dbserver1.public.inventory_items"))
        logger.info("Kafka consumer subscribed to topic.")

        withContext(Dispatchers.IO) { // Run polling in a dedicated IO thread pool
            while (true) {
                try {
                    val records = consumer.poll(Duration.ofMillis(1000))
                    for (record in records) {
                        logger.info("Received record from Kafka: ${record.value()}")
                        val action = transformDebeziumMessage(record.value())
                        if (action != null) {
                            val actionJson = Json.encodeToString(UpdateAction.serializer(), action)
                            // Broadcast to all connected WebSocket clients
                            connections.forEach { session ->
                                // Launch in a separate coroutine to avoid one bad client blocking others
                                session.launch { 
                                    session.send(Frame.Text(actionJson))
                                }
                            }
                        }
                    }
                } catch (e: Exception) {
                    logger.error("Error during Kafka poll/processing", e)
                    // In a production system, implement a backoff-retry strategy
                    kotlinx.coroutines.delay(5000)
                }
            }
        }
    }

    private fun transformDebeziumMessage(message: String): UpdateAction? {
        return try {
            val jsonObject = Json.parseToJsonElement(message).jsonObject
            val payload = jsonObject["payload"]?.jsonObject ?: return null // Tombstone message
            val after = payload["after"]?.jsonObject ?: return null

            val item = ItemPayload(
                id = after["id"]!!.jsonPrimitive.int,
                name = after["name"]!!.jsonPrimitive.content,
                quantity = after["quantity"]!!.jsonPrimitive.int
            )
            // Debezium op 'c' for create, 'u' for update. Both are treated as an upsert on the client.
            UpdateAction("UPSERT_ITEM", item)
        } catch (e: Exception) {
            logger.error("Failed to parse Debezium message: $message", e)
            null
        }
    }
}

Finally, the Dockerfile for this service is standard for a JVM application.

FROM openjdk:17-jdk-slim
WORKDIR /app
COPY build/libs/middleware-0.0.1-all.jar app.jar
CMD ["java", "-jar", "app.jar"]

The Web Client: Svelte and the Service Worker Brain

The Svelte client architecture is designed around the principle that the UI always reads from a local data source (IndexedDB) and never directly from the network. The network’s role is simply to update that local source. This decouples the user experience from network reliability. The Service Worker is the orchestrator of this logic.

// src/service-worker.js
import { build, files, version } from '$service-worker';

const CACHE_NAME = `static-cache-${version}`;
const STATIC_ASSETS = build.concat(files);

self.addEventListener('install', (event) => {
  event.waitUntil(
    caches.open(CACHE_NAME).then((cache) => {
      // Pre-cache the application shell
      return cache.addAll(STATIC_ASSETS);
    })
  );
});

self.addEventListener('activate', (event) => {
  event.waitUntil(
    caches.keys().then((keys) => {
      return Promise.all(
        keys.filter((key) => key !== CACHE_NAME).map((key) => caches.delete(key))
      );
    })
  );
});

self.addEventListener('fetch', (event) => {
  // A simple cache-first strategy. A real app might need more complex logic.
  event.respondWith(
    caches.match(event.request).then((response) => {
      return response || fetch(event.request);
    })
  );
});

// The message listener is the bridge between the app and the worker
self.addEventListener('message', (event) => {
  if (event.data && event.data.type === 'WEBSOCKET_MESSAGE_RECEIVED') {
    const action = event.data.payload;
    if (action.type === 'UPSERT_ITEM') {
      // In a real project, this would be an IndexedDB operation.
      // For simplicity here, we'll just log it.
      console.log('[SW] Received instruction to upsert item:', action.payload);
      // db.items.put(action.payload); -> This would be the IndexedDB call
    }
  }
});

The application’s data layer consists of a Svelte store and a WebSocket service. When the WebSocket service receives a message, it doesn’t update the store directly. Instead, it dispatches a message to the Service Worker, delegating the responsibility of data persistence.

// src/lib/data/websocket.service.js
import { writable } from 'svelte/store';

export const connectionStatus = writable('disconnected');
let socket;

// Ensure this code only runs in the browser
const isBrowser = typeof window !== 'undefined';

function connect() {
    if (!isBrowser) return;

    socket = new WebSocket('ws://localhost:8080/ws/inventory');

    socket.onopen = () => {
        console.log('WebSocket connection established.');
        connectionStatus.set('connected');
    };

    socket.onmessage = (event) => {
        try {
            const data = JSON.parse(event.data);
            // Crucially, delegate state change to the Service Worker
            if (navigator.serviceWorker && navigator.serviceWorker.controller) {
                navigator.serviceWorker.controller.postMessage({
                    type: 'WEBSOCKET_MESSAGE_RECEIVED',
                    payload: data,
                });
            }
        } catch (error) {
            console.error('Error parsing WebSocket message:', error);
        }
    };

    socket.onclose = () => {
        console.log('WebSocket connection closed. Attempting to reconnect...');
        connectionStatus.set('disconnected');
        // A robust implementation would use an exponential backoff strategy
        setTimeout(connect, 5000);
    };

    socket.onerror = (error) => {
        console.error('WebSocket error:', error);
        socket.close();
    };
}

// Initial connection attempt
connect();

The Svelte UI component then subscribes to a store that is populated from IndexedDB, ensuring it remains reactive to changes in the local database, regardless of the network’s status.

The Native Client: Jetpack Compose and Room

The Android client follows the exact same architectural pattern, but with native tooling. Room serves as the local database, Ktor Client handles the WebSocket connection, and a ViewModel exposes the data from Room as a StateFlow for the Compose UI to observe.

// build.gradle.kts (Module level)
dependencies {
    // Jetpack Compose
    implementation("androidx.compose.ui:ui:$compose_version")
    // ... other compose dependencies

    // Room for local persistence
    implementation("androidx.room:room-runtime:$room_version")
    kapt("androidx.room:room-compiler:$room_version")
    implementation("androidx.room:room-ktx:$room_version")

    // Ktor client for WebSockets
    implementation("io.ktor:ktor-client-core:$ktor_version")
    implementation("io.ktor:ktor-client-cio:$ktor_version")
    implementation("io.ktor:ktor-client-websockets:$ktor_version")
    implementation("io.ktor:ktor-client-content-negotiation:$ktor_version")
    implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
    
    // ViewModel & LiveData
    implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version")
    implementation("androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version")
}

The data layer consists of a Room Entity and Dao, and a Repository that orchestrates network and database operations.

// InventoryItem.kt (Room Entity)
@Entity(tableName = "inventory_items")
data class InventoryItem(
    @PrimaryKey val id: Int,
    val name: String,
    val quantity: Int
)

// InventoryDao.kt
@Dao
interface InventoryDao {
    @Query("SELECT * FROM inventory_items ORDER BY name ASC")
    fun getAll(): Flow<List<InventoryItem>>

    @Upsert
    suspend fun upsert(item: InventoryItem)
}

The InventoryRepository contains the logic to connect to the WebSocket and update the local database. A common mistake is to perform database operations on the main thread; all Room calls here are suspend functions, ensuring they run on a background thread.

// InventoryRepository.kt
class InventoryRepository(private val dao: InventoryDao) {

    val allItems: Flow<List<InventoryItem>> = dao.getAll()

    private val client = HttpClient(CIO) {
        install(WebSockets)
        install(ContentNegotiation) { json(Json) }
    }

    suspend fun connectAndListen() = withContext(Dispatchers.IO) {
        try {
            client.webSocket(method = HttpMethod.Get, host = "10.0.2.2", port = 8080, path = "/ws/inventory") {
                for (frame in incoming) {
                    if (frame is Frame.Text) {
                        val text = frame.readText()
                        val action = Json.decodeFromString<UpdateAction>(text)
                        
                        // Update the local database. The UI will react automatically.
                        if (action.type == "UPSERT_ITEM") {
                            val item = InventoryItem(
                                id = action.payload.id,
                                name = action.payload.name,
                                quantity = action.payload.quantity
                            )
                            dao.upsert(item)
                        }
                    }
                }
            }
        } catch (e: Exception) {
            // Handle connection errors, schedule reconnection etc.
            Log.e("InventoryRepository", "WebSocket connection failed", e)
        }
    }
}

The ViewModel simply exposes the repository’s data stream, and the Composable UI collects this stream as state.

// InventoryViewModel.kt
class InventoryViewModel(application: Application) : AndroidViewModel(application) {
    private val repository: InventoryRepository
    val items: StateFlow<List<InventoryItem>>

    init {
        val db = InventoryDatabase.getDatabase(application)
        repository = InventoryRepository(db.inventoryDao())
        items = repository.allItems.stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
        // Start listening in the background
        viewModelScope.launch {
            repository.connectAndListen()
        }
    }
}

// InventoryScreen.kt (Composable)
@Composable
fun InventoryScreen(viewModel: InventoryViewModel) {
    val items by viewModel.items.collectAsState()

    LazyColumn {
        items(items) { item ->
            Text(text = "${item.name}: ${item.quantity}")
        }
    }
}

Automation: The GitHub Actions Pipeline

Managing the build, test, and deployment of three distinct artifacts (a Docker image, a static website, and an Android APK) manually is untenable. A GitHub Actions workflow automates this entire process.

# .github/workflows/ci-cd.yml
name: Build and Deploy Full Stack

on:
  push:
    branches: [ main ]

jobs:
  build-middleware:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Set up JDK 17
        uses: actions/setup-java@v3
        with:
          java-version: '17'
          distribution: 'temurin'

      - name: Build with Gradle
        working-directory: ./middleware
        run: ./gradlew build shadowJar

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v2

      - name: Log in to GitHub Container Registry
        uses: docker/login-action@v2
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Build and push Docker image
        uses: docker/build-push-action@v4
        with:
          context: ./middleware
          push: true
          tags: ghcr.io/${{ github.repository }}/inventory-middleware:latest

  build-web-client:
    runs-on: ubuntu-latest
    needs: build-middleware
    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Set up Node.js
        uses: actions/setup-node@v3
        with:
          node-version: '18'
      
      - name: Install dependencies and build
        working-directory: ./svelte-client
        run: |
          npm install
          npm run build

      - name: Upload artifact
        uses: actions/upload-artifact@v3
        with:
          name: svelte-dist
          path: ./svelte-client/build/

  build-android-client:
    runs-on: ubuntu-latest
    needs: build-middleware
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
      
      - name: Set up JDK 17
        uses: actions/setup-java@v3
        with:
          java-version: '17'
          distribution: 'temurin'
      
      - name: Grant execute permission for gradlew
        working-directory: ./android-client
        run: chmod +x gradlew
        
      - name: Build with Gradle
        working-directory: ./android-client
        run: ./gradlew assembleRelease
        
      - name: Upload APK
        uses: actions/upload-artifact@v3
        with:
          name: android-apk
          path: ./android-client/app/build/outputs/apk/release/app-release-unsigned.apk

This workflow defines three parallelizable jobs. A real-world pipeline would include comprehensive unit and integration test stages, secret management for signing keys, and automated deployment steps to environments like Kubernetes and a static web host. The key principle is creating a single, automated path from code commit to deployable artifact for every component of the system.

The current design presents several limitations that would need to be addressed in a production deployment. The middleware broadcasts every single change to every connected client, which is both inefficient and insecure. A topic-based subscription model (/ws/inventory/region-A) would be necessary. The offline conflict resolution is non-existent; if two clients modify the same item while offline, the last one to reconnect wins. Implementing a more robust strategy, potentially using CRDTs or vector clocks, would be a significant but necessary evolution. Furthermore, the CDC pipeline is sensitive to database schema changes. A formal schema evolution strategy, including versioning of the WebSocket API payloads, would be critical for long-term maintainability.


  TOC