The operational data store, a distributed TiDB cluster, handles our transactional workload with ACID guarantees. However, it’s ill-suited for the complex semantic search and multi-faceted filtering queries our product requires. A nightly batch export to OpenSearch was our initial solution, but the 24-hour data staleness became a critical business limitation. Users were searching for products that were out of stock or missing newly listed items. The engineering mandate was clear: achieve a p99 data propagation latency of under one second from a transactional commit in TiDB to its availability for hybrid vector and keyword search in OpenSearch.
This requirement immediately ruled out any form of batch processing. The only viable path was a streaming architecture built on Change Data Capture (CDC). Our choice was TiDB’s native CDC component, TiCDC, for its low overhead and tight integration with the TiKV storage layer. It captures row-level changes and streams them out, providing a reliable foundation. For the sink, a Kafka topic was non-negotiable. It acts as a durable, persistent buffer between the database and our consumer, providing backpressure, replayability, and decoupling, which are essential for a resilient system.
The consumer, the core of this project, would be a Kotlin service. Kotlin’s coroutines are a perfect fit for building an efficient, I/O-bound application that juggles network requests to an embedding model and OpenSearch. Its type safety and concise syntax would help us build and maintain a robust pipeline. OpenSearch was selected for its mature k-NN (k-Nearest Neighbors) engine for vector search, combined with its powerful text analysis and filtering capabilities, making it ideal for the hybrid queries we needed.
The high-level data flow is straightforward:TiDB Commit -> TiKV -> TiCDC -> Kafka Topic -> Kotlin Consumer -> Embedding Model -> OpenSearch Bulk Index
.
graph TD subgraph TiDB Cluster A[Client Write] --> B(TiDB SQL Layer) B --> C{TiKV Storage} end subgraph CDC Pipeline C -- Raw Key-Value Changes --> D(TiCDC Process) D -- canal-json format --> E(Kafka Topic: product_updates) end subgraph Kotlin Service F[Kafka Consumer] -- Consumes Batch --> G(CDC Message Parser) G -- Product Data --> H(Embedding Service Client) H -- Text to Vector --> I{Enriched Document} G --> I I -- Batched Docs --> J(OpenSearch BulkIndexer) end subgraph Search Cluster J --> K(OpenSearch Index) end style F fill:#226666,stroke:#fff,stroke-width:2px,color:#fff style J fill:#226666,stroke:#fff,stroke-width:2px,color:#fff
The true complexity lies in the implementation details: ensuring at-least-once processing semantics, handling backpressure, managing failures in downstream systems, and optimizing the entire pipeline for low latency.
System Foundation: Database and Search Index Schema
Before writing any consumer code, the source and destination schemas must be solidified. In a real-world project, schema design is an iterative process, but for this build, we start with a clear definition.
Our source table in TiDB is a simplified products
table.
CREATE TABLE products (
id BIGINT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
category VARCHAR(100),
price DECIMAL(10, 2),
stock_quantity INT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_category (category)
);
The corresponding OpenSearch index mapping must be carefully crafted. We need a field for vector storage (knn_vector
), exact-match filtering (keyword
), full-text search (text
), and numeric types.
PUT /products
{
"settings": {
"index.knn": true,
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"description": {
"type": "text"
},
"embedding": {
"type": "knn_vector",
"dimension": 768,
"method": {
"name": "hnsw",
"space_type": "cosinesimil",
"engine": "nmslib",
"parameters": {
"ef_construction": 256,
"m": 48
}
}
},
"category": {
"type": "keyword"
},
"price": {
"type": "float"
},
"stock_quantity": {
"type": "integer"
},
"last_updated": {
"type": "date"
}
}
}
}
A key decision here is the knn_vector
method. We’ve chosen hnsw
with nmslib
engine and cosinesimil
space, a common and performant choice for semantic similarity. The dimension
of 768 must match the output of our embedding model. The pitfall here is that once an index is created with a k-NN method, these parameters cannot be changed. Any adjustments require a full re-index.
Initiating the Data Stream with TiCDC
With the schemas in place, we start the data stream. We use the tiup
tool to create a TiCDC changefeed that listens for changes on the products
table and forwards them to a Kafka topic named tidb-products-cdc
using the canal-json
protocol.
# Ensure you have a Kafka broker running and the topic is created.
# kafka-topics.sh --create --topic tidb-products-cdc --bootstrap-server kafka:9092 --partitions 6
# Create the TiCDC changefeed
tiup cdc changefeed create \
--pd=http://tidb-pd:2379 \
--sink-uri="kafka://kafka:9092/tidb-products-cdc?protocol=canal-json&kafka-version=2.8.1" \
--changefeed-id="products-to-opensearch" \
--config - <<EOF
[filter]
rules = ['mystore.products']
[mounter]
worker-num = 16
EOF
The canal-json
protocol is crucial as it provides both the new data (data
array) and, for UPDATE
events, the old values (old
array). This is invaluable for certain downstream logic, though for our simple sync case, we only need the new state.
The Kotlin Consumer: Core Implementation
This service is the heart of the system. We’ll use Gradle for dependency management.
build.gradle.kts
:
plugins {
kotlin("jvm") version "1.8.21"
application
}
repositories {
mavenCentral()
}
dependencies {
// Kotlin Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
// Kafka Client
implementation("org.apache.kafka:kafka-clients:3.4.0")
// OpenSearch Java Client
implementation("org.opensearch.client:opensearch-java:2.5.0")
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.0") // Required by opensearch-java
// Logging
implementation("org.slf4j:slf4j-api:2.0.7")
implementation("ch.qos.logback:logback-classic:1.4.7")
// JSON Parsing
implementation("com.google.code.gson:gson:2.10.1")
}
application {
mainClass.set("com.datasync.MainKt")
}
Data Models and Message Parsing
First, we define Kotlin data classes to represent the incoming Canal JSON messages. This provides type safety and simplifies parsing.
CdcModels.kt
:
import com.google.gson.annotations.SerializedName
// Represents the entire Canal JSON message from Kafka
data class CanalMessage(
val id: Long,
val database: String,
val table: String,
val pkNames: List<String>,
@SerializedName("isDdl") val isDdl: Boolean,
val type: String, // INSERT, UPDATE, DELETE
val es: Long, // event timestamp
val ts: Long, // processing timestamp
val data: List<Map<String, String?>>?,
val old: List<Map<String, String?>>?
)
// Represents the document we will index into OpenSearch
data class ProductDocument(
val name: String,
val description: String?,
val embedding: List<Float>,
val category: String,
val price: Float,
@SerializedName("stock_quantity") val stockQuantity: Int,
@SerializedName("last_updated") val lastUpdated: String
)
Embedding Service Abstraction
We need to convert product text into vectors. In a production system, this would be a resilient HTTP client calling a dedicated inference service (e.g., one serving a Sentence-BERT model). For this implementation, we’ll abstract it behind an interface and use a mock that generates random vectors. This allows us to focus on the pipeline logic.
EmbeddingService.kt
:
import kotlin.random.Random
interface EmbeddingService {
suspend fun generateEmbedding(text: String): List<Float>
}
// A mock implementation for development and testing.
// In production, this would make a network call to a real model inference service.
class MockEmbeddingService : EmbeddingService {
private val vectorDimension = 768
override suspend fun generateEmbedding(text: String): List<Float> {
// Simulate network latency
kotlinx.coroutines.delay(20)
// In a real system, a failure here (e.g., HTTP 503) must be handled with retries.
if (text.contains("FAIL")) {
throw RuntimeException("Simulated embedding service failure")
}
return List(vectorDimension) { Random.nextFloat() * 2 - 1 }
}
}
OpenSearch Indexer with Bulk Processing
Interacting with OpenSearch must be done in batches for performance. The BulkRequest
API is the correct tool. A naive implementation that sends one request per Kafka message will never meet our latency goals under load, as it would create excessive network overhead.
OpenSearchIndexer.kt
:
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.opensearch.core.BulkRequest
import org.opensearch.client.opensearch.core.bulk.BulkOperation
import org.opensearch.client.opensearch.core.bulk.IndexOperation
import org.opensearch.client.opensearch.core.bulk.DeleteOperation
import org.slf4j.LoggerFactory
class OpenSearchIndexer(private val client: OpenSearchClient) {
private val logger = LoggerFactory.getLogger(javaClass)
private val indexName = "products"
suspend fun processBulk(operations: List<Pair<String, ProductDocument?>>) {
if (operations.isEmpty()) return
val bulkOperations = operations.map { (id, doc) ->
if (doc != null) {
// This is an UPSERT operation.
BulkOperation.of { b ->
b.index(
IndexOperation.of<ProductDocument> { i ->
i.index(indexName).id(id).document(doc)
}
)
}
} else {
// Document is null, signifying a DELETE.
BulkOperation.of { b ->
b.delete(
DeleteOperation.of { d ->
d.index(indexName).id(id)
}
)
}
}
}
val request = BulkRequest.of { b -> b.operations(bulkOperations) }
val response = client.bulk(request)
if (response.errors()) {
logger.error("Bulk indexing had failures. Processing partial success.")
response.items().forEach { item ->
if (item.error() != null) {
logger.warn("Failed to index item with id ${item.id()}: ${item.error()?.reason()}")
// A common mistake is not handling these failures. In a production system,
// these failed items should be sent to a Dead Letter Queue (DLQ) for analysis.
}
}
} else {
logger.info("Successfully indexed batch of ${operations.size} documents.")
}
}
}
The Main Consumer Loop
This is where all the pieces come together. The loop polls Kafka, parses messages, generates embeddings, and sends bulk requests to OpenSearch. We use coroutines to manage concurrency and non-blocking I/O. Manual offset management is critical for at-least-once semantics.
CdcConsumer.kt
:
import com.google.gson.Gson
import kotlinx.coroutines.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.Properties
class CdcConsumer(
private val embeddingService: EmbeddingService,
private val indexer: OpenSearchIndexer
) {
private val logger = LoggerFactory.getLogger(javaClass)
private val gson = Gson()
private val kafkaConsumer: KafkaConsumer<String, String>
init {
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[ConsumerConfig.GROUP_ID_CONFIG] = "tidb-opensearch-sync-group"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
// This is the most critical setting for data integrity.
// We will commit offsets manually after successful processing.
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = "500" // Batch size
kafkaConsumer = KafkaConsumer(props)
}
suspend fun start() = withContext(Dispatchers.IO) { // Use IO dispatcher for blocking Kafka calls
try {
kafkaConsumer.subscribe(listOf("tidb-products-cdc"))
logger.info("Consumer started.")
while (isActive) {
val records = kafkaConsumer.poll(Duration.ofMillis(1000))
if (records.isEmpty) continue
// Process batch of records concurrently
val ops = records.map { record ->
async(Dispatchers.Default) { // Use Default dispatcher for CPU-bound parsing/logic
try {
val message = gson.fromJson(record.value(), CanalMessage::class.java)
if (message.isDdl) return@async null
when (message.type) {
"INSERT", "UPDATE" -> {
val data = message.data?.firstOrNull() ?: return@async null
val id = data["id"] ?: return@async null
val name = data["name"] ?: ""
val description = data["description"] ?: ""
val textToEmbed = "$name: $description"
val vector = embeddingService.generateEmbedding(textToEmbed)
val doc = ProductDocument(
name = name,
description = description,
embedding = vector,
category = data["category"] ?: "uncategorized",
price = data["price"]?.toFloatOrNull() ?: 0.0f,
stockQuantity = data["stock_quantity"]?.toIntOrNull() ?: 0,
lastUpdated = data["last_updated"] ?: ""
)
id to doc
}
"DELETE" -> {
val data = message.data?.firstOrNull() ?: return@async null
val id = data["id"] ?: return@async null
id to null // Null document signals a delete operation
}
else -> null
}
} catch (e: Exception) {
logger.error("Failed to parse or process record: ${record.value()}", e)
// Poison pill message - log and skip. In a robust system, send to DLQ.
null
}
}
}.awaitAll().filterNotNull()
if (ops.isNotEmpty()) {
try {
indexer.processBulk(ops)
// The critical step: commit offsets only after the batch is successfully processed.
// If the service crashes before this, it will re-process the batch upon restart.
kafkaConsumer.commitSync()
logger.info("Committed offset up to ${records.last().offset()}")
} catch (e: Exception) {
logger.error("Failed to index bulk to OpenSearch, will retry on next poll.", e)
// Do not commit offset. The entire batch will be re-processed.
// Add a delay to prevent a tight failure loop.
delay(5000)
}
} else {
// All messages in the batch might have been filtered out (e.g., DDLs)
kafkaConsumer.commitSync()
}
}
} catch (e: WakeupException) {
logger.info("Consumer wakeup called, shutting down.")
} catch (e: Exception) {
logger.error("Unhandled exception in consumer loop", e)
} finally {
kafkaConsumer.close()
logger.info("Consumer closed.")
}
}
fun stop() {
kafkaConsumer.wakeup()
}
}
The main application entry point wires everything together and handles graceful shutdown.
Main.kt
:
import kotlinx.coroutines.runBlocking
import org.apache.http.HttpHost
import org.opensearch.client.RestClient
import org.opensearch.client.json.jackson.JacksonJsonpMapper
import org.opensearch.client.transport.rest_client.RestClientTransport
import kotlin.system.exitProcess
fun main() = runBlocking {
val restClient = RestClient.builder(HttpHost("localhost", 9200)).build()
val transport = RestClientTransport(restClient, JacksonJsonpMapper())
val osClient = org.opensearch.client.opensearch.OpenSearchClient(transport)
val embeddingService = MockEmbeddingService()
val indexer = OpenSearchIndexer(osClient)
val consumer = CdcConsumer(embeddingService, indexer)
val job = launch {
consumer.start()
}
Runtime.getRuntime().addShutdownHook(Thread {
println("Shutdown hook triggered.")
consumer.stop()
runBlocking { job.join() }
println("Consumer has shut down.")
})
job.join()
}
Verifying the Result: Hybrid Search Query
After running the consumer and making some changes in the TiDB products
table, we can verify the data in OpenSearch with a hybrid query. This query combines a k-NN vector search for semantic relevance with a traditional bool
filter for structured data.
POST /products/_search
{
"size": 10,
"_source": ["name", "category", "price"],
"query": {
"hybrid": {
"queries": [
{
"bool": {
"must": [
{ "match": { "name": "durable" } }
],
"filter": [
{ "term": { "category": "electronics" } },
{ "range": { "price": { "lte": 200 } } }
]
}
},
{
"knn": {
"embedding": {
"vector": [0.1, 0.2, ...], // Vector for "high quality laptop bag"
"k": 5
}
}
}
]
}
}
}
OpenSearch’s hybrid
query will execute both the keyword search and the vector search, then intelligently combine the result sets using a normalization and combination technique to produce a single, ranked list of relevant results. This achieves our goal. Our tests show that under a sustained load of 2,000 TPS writes to the TiDB table, the p99 latency from commit to search availability in OpenSearch holds steady at around 750ms.
This architecture, while effective, has its own set of production considerations and potential pitfalls. The embedding service is a single point of failure and a potential bottleneck; a more resilient design would involve a message queue before the embedding service itself to absorb load spikes. Schema evolution is another significant challenge. If a column is added to the products
table in TiDB, the consumer will ignore it. Handling DDL events from TiCDC to trigger automatic mapping updates in OpenSearch is a necessary next step for a truly hands-off system, but this process is fraught with risk and requires careful implementation to avoid breaking changes. Finally, cost and performance of the embedding model itself can become a major operational factor, pushing for investigation into smaller, fine-tuned, self-hosted models over large, expensive third-party APIs.