The initial system was a straightforward monolithic service responsible for handling image uploads, processing them, and indexing for similarity search. This design failed spectacularly once traffic exceeded a few dozen concurrent requests. Each incoming HTTP request would lock a thread, perform file I/O, execute CPU-intensive OpenCV operations, and then make a network call to the vector database. Latency skyrocketed, the CPU thrashed, and the service became a bottleneck. The core technical pain point was the tight coupling of low-latency network I/O with high-latency, unpredictable computational work.
A redesign was necessary, centered on decoupling and asynchronous batch processing. The concept was to split the system into two specialized microservices: a lightweight, high-performance ingress gateway and a robust, scalable processing worker. The gateway would be responsible only for accepting requests at high velocity and queuing them. The worker would consume these tasks in efficient batches, maximizing resource utilization. This polyglot approach allows for using the best tool for each job.
For the ingress gateway, Go with the Fiber framework was chosen. Its performance, low memory footprint, and concurrency model based on goroutines make it ideal for handling thousands of simultaneous, short-lived network connections without breaking a sweat. Its job is simple: accept an image, validate it, and pass it on.
For the processing worker, Kotlin on the JVM was the pragmatic choice. While the JVM has a higher memory overhead, its ecosystem is unparalleled for this kind of task. Mature and stable libraries for OpenCV, a feature-rich official gRPC client for Qdrant, and powerful concurrency primitives make it a solid foundation for complex, long-running computations. Containerizing this service would be handled by Jib, completely eliminating the need for Dockerfiles and the associated maintenance overhead, while producing optimized, reproducible container images directly from the build tool.
Qdrant was retained as the vector database due to its performance and, critically for this new architecture, its first-class support for batch upsert operations, which is the cornerstone of the entire optimization strategy. Communication between the Go gateway and the Kotlin worker would be handled via gRPC for a strongly-typed, efficient, and language-agnostic contract.
The final architecture looks like this:
sequenceDiagram participant Client participant Go-Fiber Gateway participant Kotlin-OpenCV Worker participant Qdrant DB Client->>+Go-Fiber Gateway: POST /upload (Image Data) Go-Fiber Gateway-->>-Client: 202 Accepted (Request Queued) Go-Fiber Gateway->>+Kotlin-OpenCV Worker: gRPC ProcessImages (Batch of Images) Kotlin-OpenCV Worker-->>-Go-Fiber Gateway: gRPC Ack Note right of Kotlin-OpenCV Worker: 1. Decode Image Bytes
2. OpenCV Preprocessing
3. Generate Vector Embedding Kotlin-OpenCV Worker->>+Qdrant DB: Batch Upsert Points (Vectors) Qdrant DB-->>-Kotlin-OpenCV Worker: Upsert Confirmation
The Go-Fiber Ingress Gateway
The Go service is intentionally minimal. It exposes a single endpoint for multipart form data uploads. A crucial design decision here is that it does not process images synchronously. Instead, it places them into an in-memory buffered channel, which acts as a temporary queue. A background goroutine collects items from this channel into batches, either when a batch reaches a certain size or a timeout is exceeded. This prevents a single slow downstream call from blocking all incoming requests.
First, the gRPC contract must be defined in a proto
file. This is the source of truth for communication between the two services.
ingestion.proto
:
syntax = "proto3";
package ingestion;
option go_package = "ingestion/ingestionpb";
option java_multiple_files = true;
option java_package = "io.system_architect.ingestion.grpc";
option java_outer_classname = "IngestionProto";
service IngestionService {
rpc ProcessImages(ProcessImagesRequest) returns (ProcessImagesResponse);
}
message ImagePayload {
string client_assigned_id = 1;
bytes image_data = 2;
string content_type = 3;
}
message ProcessImagesRequest {
repeated ImagePayload images = 1;
}
message IngestionStatus {
string client_assigned_id = 1;
bool success = 2;
string message = 3;
}
message ProcessImagesResponse {
repeated IngestionStatus statuses = 1;
}
With the contract defined, we generate the Go code using protoc
.
The core of the Go service is the batching mechanism.
main.go
:
package main
import (
"context"
"fmt"
"ingestion/ingestionpb"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
maxBatchSize = 64
maxBatchTimeout = 2 * time.Second
)
type imageRequest struct {
id string
data []byte
contentType string
}
var (
requestChannel chan imageRequest
grpcClient ingestionpb.IngestionServiceClient
)
func main() {
// A buffered channel to act as our in-memory queue.
// In a real-world project, this would be replaced by Kafka or RabbitMQ.
requestChannel = make(chan imageRequest, 1024)
// Setup gRPC client connection to the Kotlin worker
conn, err := grpc.Dial("kotlin-worker:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect to gRPC server: %v", err)
}
defer conn.Close()
grpcClient = ingestionpb.NewIngestionServiceClient(conn)
// Start the background worker goroutine for batching
var wg sync.WaitGroup
wg.Add(1)
go batchProcessor(&wg)
// Setup Fiber web server
app := fiber.New(fiber.Config{
BodyLimit: 10 * 1024 * 1024, // 10MB file limit
})
app.Post("/upload", handleUpload)
go func() {
if err := app.Listen(":3000"); err != nil {
log.Fatalf("Fiber server failed to start: %v", err)
}
}()
// Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
close(requestChannel) // Signal the batch processor to finish
wg.Wait() // Wait for the processor to flush remaining items
if err := app.Shutdown(); err != nil {
log.Fatalf("Server shutdown failed: %v", err)
}
log.Println("Server gracefully stopped")
}
func handleUpload(c *fiber.Ctx) error {
file, err := c.FormFile("image")
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "image part is missing"})
}
fileBuffer, err := file.Open()
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "cannot open file"})
}
defer fileBuffer.Close()
buf := make([]byte, file.Size)
_, err = fileBuffer.Read(buf)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "cannot read file into buffer"})
}
req := imageRequest{
id: uuid.New().String(),
data: buf,
contentType: file.Header.Get("Content-Type"),
}
// Non-blocking send to the channel.
// If the channel is full, we immediately reject the request.
// This is a form of backpressure.
select {
case requestChannel <- req:
return c.Status(fiber.StatusAccepted).JSON(fiber.Map{"id": req.id, "status": "queued"})
default:
log.Println("Request channel is full. Rejecting request.")
return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "server is busy, please try again later"})
}
}
// This is the core logic for batch processing.
func batchProcessor(wg *sync.WaitGroup) {
defer wg.Done()
batch := make([]imageRequest, 0, maxBatchSize)
ticker := time.NewTicker(maxBatchTimeout)
for {
select {
case req, ok := <-requestChannel:
if !ok {
// Channel closed, flush any remaining items and exit.
if len(batch) > 0 {
log.Printf("Channel closed. Flushing final batch of %d images.\n", len(batch))
sendBatch(batch)
}
return
}
batch = append(batch, req)
if len(batch) >= maxBatchSize {
log.Printf("Batch size reached. Sending %d images.\n", len(batch))
sendBatch(batch)
batch = make([]imageRequest, 0, maxBatchSize)
ticker.Reset(maxBatchTimeout) // Reset the timer after sending
}
case <-ticker.C:
// Timeout reached, send whatever is in the batch.
if len(batch) > 0 {
log.Printf("Batch timeout. Sending %d images.\n", len(batch))
sendBatch(batch)
batch = make([]imageRequest, 0, maxBatchSize)
}
}
}
}
func sendBatch(batch []imageRequest) {
payloads := make([]*ingestionpb.ImagePayload, len(batch))
for i, req := range batch {
payloads[i] = &ingestionpb.ImagePayload{
ClientAssignedId: req.id,
ImageData: req.data,
ContentType: req.contentType,
}
}
request := &ingestionpb.ProcessImagesRequest{Images: payloads}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := grpcClient.ProcessImages(ctx, request)
if err != nil {
// A common mistake is to not handle gRPC errors properly.
// This could indicate the worker is down or overwhelmed.
log.Printf("Failed to send batch to Kotlin worker: %v\n", err)
// Here you would add retry logic or push to a dead-letter queue.
} else {
log.Printf("Successfully sent batch of %d images to worker.\n", len(batch))
}
}
This Go service is now a highly efficient, non-blocking shock absorber for the system.
The Kotlin Processing Worker
The Kotlin worker is where the heavy lifting happens. It needs to implement the gRPC service, manage OpenCV native libraries, process images in parallel, and perform batch inserts into Qdrant.
The project is set up with Gradle. The build.gradle.kts
file is critical, especially for managing gRPC code generation and packaging with Jib.
build.gradle.kts
:
import com.google.protobuf.gradle.*
plugins {
kotlin("jvm") version "1.9.20"
id("com.google.protobuf") version "0.9.4"
id("com.github.johnrengelman.shadow") version "8.1.1"
id("com.google.cloud.tools.jib") version "3.4.0"
}
// ... repositories and dependencies ...
dependencies {
// Kotlin & Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
// gRPC
implementation("io.grpc:grpc-netty-shaded:1.59.0")
implementation("io.grpc:grpc-protobuf:1.59.0")
implementation("io.grpc:grpc-stub:1.59.0")
implementation(kotlin("stdlib-jdk8"))
compileOnly("org.apache.tomcat:annotations-api:6.0.53")
// Qdrant Client
implementation("io.qdrant:client:1.7.0")
// OpenCV
implementation("org.openpnp:opencv:4.8.1-0")
// Logging
implementation("io.github.microutils:kotlin-logging-jvm:3.0.5")
implementation("ch.qos.logback:logback-classic:1.4.11")
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:3.24.4" }
plugins {
id("grpc") { artifact = "io.grpc:protoc-gen-grpc-java:1.59.0" }
id("grpckt") { artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1" }
}
generateProtoTasks {
ofSourceSet("main").forEach {
it.plugins {
id("grpc") {}
id("grpckt") {}
}
it.builtins {
id("kotlin") {}
}
}
}
}
// Jib configuration
jib {
from.image = "eclipse-temurin:17-jre-focal"
to.image = "polyglot-worker:latest"
container.mainClass = "io.system_architect.worker.WorkerServerKt"
// A CRITICAL step for OpenCV: copy native libraries into the container's library path
extraDirectories.paths.add(file("src/main/resources/natives"))
container.jvmFlags = listOf("-Djava.library.path=/app/resources")
}
A significant pitfall in using OpenCV on the JVM inside a container is managing its native libraries (.so
files on Linux). Jib makes this manageable. The native libraries must be extracted from the OpenCV JAR and placed in a directory (src/main/resources/natives
here). The jib
configuration then copies this directory into the container and, crucially, adds it to the java.library.path
via a JVM flag. Without this, the application will crash with an UnsatisfiedLinkError
at runtime.
The main worker server implementation uses Kotlin Coroutines to process the images within a batch concurrently, maximizing CPU usage.
WorkerServer.kt
:
package io.system_architect.worker
import io.github.oshai.kotlinlogging.KotlinLogging
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.qdrant.client.QdrantClient
import io.qdrant.client.QdrantGrpcClient
import io.qdrant.client.grpc.Points.*
import kotlinx.coroutines.runBlocking
import nu.pattern.OpenCV
import java.util.concurrent.TimeUnit
private val logger = KotlinLogging.logger {}
class WorkerServer(private val port: Int) {
private val qdrantClient: QdrantClient
val server: Server
init {
// Load OpenCV native library
try {
OpenCV.loadLocally()
logger.info { "OpenCV native library loaded successfully." }
} catch (e: Exception) {
logger.error(e) { "FATAL: Failed to load OpenCV native library." }
throw e
}
// Initialize Qdrant client
val qdrantHost = System.getenv("QDRANT_HOST") ?: "localhost"
qdrantClient = QdrantClient(
QdrantGrpcClient.newBuilder(qdrantHost, 6334, false).build()
)
// Ensure collection exists
runBlocking {
val collectionName = "visual_search_vectors"
val collections = qdrantClient.listCollectionsAsync().await()
if (!collections.contains(collectionName)) {
logger.info { "Collection '$collectionName' not found. Creating..." }
qdrantClient.createCollectionAsync(
collectionName,
VectorParams.newBuilder().setDistance(Distance.Cosine).setSize(512).build()
).await()
}
}
server = NettyServerBuilder.forPort(port)
.addService(IngestionServiceImpl(qdrantClient))
.build()
}
fun start() {
server.start()
logger.info { "Server started, listening on $port" }
Runtime.getRuntime().addShutdownHook(Thread {
logger.info { "*** shutting down gRPC server since JVM is shutting down" }
this@WorkerServer.stop()
logger.info { "*** server shut down" }
})
}
private fun stop() {
qdrantClient.close()
server.shutdown().awaitTermination(30, TimeUnit.SECONDS)
}
fun blockUntilShutdown() {
server.awaitTermination()
}
}
fun main() {
val port = 50051
val server = WorkerServer(port)
server.start()
server.blockUntilShutdown()
}
The gRPC service implementation is where the core logic resides. It receives a batch of images, processes them, and then performs a single bulk upsert to Qdrant.
IngestionServiceImpl.kt
:
package io.system_architect.worker
import com.google.protobuf.ByteString
import io.github.oshai.kotlinlogging.KotlinLogging
import io.qdrant.client.QdrantClient
import io.qdrant.client.PointIdFactory
import io.qdrant.client.PointStructFactory
import io.qdrant.client.grpc.Points.UpdateStatus
import io.system_architect.ingestion.grpc.ImagePayload
import io.system_architect.ingestion.grpc.IngestionServiceGrpcKt
import io.system_architect.ingestion.grpc.ProcessImagesRequest
import io.system_architect.ingestion.grpc.ProcessImagesResponse
import io.system_architect.ingestion.grpc.ingestionStatus
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import org.opencv.core.Mat
import org.opencv.core.MatOfByte
import org.opencv.core.Size
import org.opencv.imgcodecs.Imgcodecs
import org.opencv.imgproc.Imgproc
import java.util.UUID
private val logger = KotlinLogging.logger {}
const val COLLECTION_NAME = "visual_search_vectors"
const val VECTOR_DIMENSION = 512
class IngestionServiceImpl(private val qdrantClient: QdrantClient) : IngestionServiceGrpcKt.IngestionServiceCoroutineImplBase() {
override suspend fun processImages(request: ProcessImagesRequest): ProcessImagesResponse {
val imageCount = request.imagesList.size
logger.info { "Received a batch of $imageCount images to process." }
if (imageCount == 0) {
return ProcessImagesResponse.newBuilder().build()
}
val processingResults = coroutineScope {
request.imagesList.map { payload ->
async(Dispatchers.Default) { // Use Default dispatcher for CPU-bound work
processSingleImage(payload)
}
}.awaitAll()
}
val validPoints = processingResults.filterIsInstance<ImageProcessingResult.Success>()
val failedResults = processingResults.filterIsInstance<ImageProcessingResult.Failure>()
if (validPoints.isNotEmpty()) {
val pointStructs = validPoints.map {
PointStructFactory.newPointStruct(
PointIdFactory.id(UUID.fromString(it.id)),
it.vector,
// In a real project, add metadata payload here
)
}
try {
// The most critical optimization: batch upsert to Qdrant
val updateResult = qdrantClient.upsertPointsAsync(COLLECTION_NAME, pointStructs, null, true).await()
if (updateResult.status != UpdateStatus.Completed) {
logger.error { "Qdrant batch upsert failed with status: ${updateResult.status}" }
// Handle partial failure if needed
}
} catch (e: Exception) {
logger.error(e) { "Exception during Qdrant batch upsert." }
// Here, we fail the entire batch. A more robust implementation might retry.
}
}
val statuses = validPoints.map { ingestionStatus { clientAssignedId = it.originalId; success = true } } +
failedResults.map { ingestionStatus { clientAssignedId = it.originalId; success = false; message = it.reason } }
return ProcessImagesResponse.newBuilder().addAllStatuses(statuses).build()
}
private fun processSingleImage(payload: ImagePayload): ImageProcessingResult {
return try {
val mat = Imgcodecs.imdecode(MatOfByte(*payload.imageData.toByteArray()), Imgcodecs.IMREAD_COLOR)
if (mat.empty()) {
return ImageProcessingResult.Failure(payload.clientAssignedId, "Failed to decode image.")
}
// Simple OpenCV preprocessing: resize and convert to a standard format
val resized = Mat()
Imgproc.resize(mat, resized, Size(224.0, 224.0), 0.0, 0.0, Imgproc.INTER_AREA)
// In a real project, this is where you'd call a deep learning model (e.g., ONNX, PyTorch)
// to get a meaningful vector embedding. For this example, we simulate it.
val vector = generateSimulatedVector(resized)
// Release native memory managed by OpenCV
mat.release()
resized.release()
ImageProcessingResult.Success(payload.clientAssignedId, UUID.randomUUID().toString(), vector)
} catch (e: Exception) {
logger.warn(e) { "Error processing image ID ${payload.clientAssignedId}" }
ImageProcessingResult.Failure(payload.clientAssignedId, e.message ?: "Unknown processing error")
}
}
private fun generateSimulatedVector(image: Mat): List<Float> {
// This is a placeholder for a real ML model inference.
// We generate a vector based on simple image properties to have a deterministic output.
val vector = FloatArray(VECTOR_DIMENSION)
val mean = org.opencv.core.Core.mean(image)
vector[0] = mean.`val`[0].toFloat() / 255.0f // Blue channel
vector[1] = mean.`val`[1].toFloat() / 255.0f // Green channel
vector[2] = mean.`val`[2].toFloat() / 255.0f // Red channel
// Fill the rest with some values to match dimension
for (i in 3 until VECTOR_DIMENSION) {
vector[i] = (vector[0] + vector[1] + vector[2]) / 3.0f * (i % 10) / 10.0f
}
return vector.toList()
}
}
sealed class ImageProcessingResult {
data class Success(val originalId: String, val id: String, val vector: List<Float>) : ImageProcessingResult()
data class Failure(val originalId: String, val reason: String) : ImageProcessingResult()
}
The key takeaways from the Kotlin implementation are:
- Parallel Processing:
async(Dispatchers.Default)
is used to parallelize the CPU-bound OpenCV work across available cores for a single batch. - Batch Database Operation: All successfully generated vectors are inserted into Qdrant with a single
upsertPointsAsync
call. A common mistake is to loop and insert one by one, which creates immense network and database overhead. - Resource Management: Explicitly calling
mat.release()
is important for managing off-heap memory used by OpenCV’s native code.
Deployment and Execution
To run this polyglot system, Docker Compose provides a simple way to orchestrate the services.
docker-compose.yml
:
version: '3.8'
services:
qdrant:
image: qdrant/qdrant:v1.7.4
ports:
- "6333:6333"
- "6334:6334"
volumes:
- ./qdrant_storage:/qdrant/storage
go-gateway:
build:
context: ./go-gateway # Assuming Go code is in this directory
dockerfile: Dockerfile
ports:
- "3000:3000"
depends_on:
- kotlin-worker
environment:
- KOTLIN_WORKER_ADDRESS=kotlin-worker:50051
kotlin-worker:
# This image is built by Jib, so we just reference it.
# We would run './gradlew jibDockerBuild' in the kotlin-worker directory first.
image: polyglot-worker:latest
ports:
- "50051:50051"
depends_on:
- qdrant
environment:
- QDRANT_HOST=qdrant
This architecture successfully decouples the system components, allowing each to scale independently and perform its task optimally. The Go gateway can handle a massive influx of requests, while the Kotlin workers can be scaled horizontally based on the processing queue depth, ensuring efficient use of expensive compute resources. The batching strategy transforms thousands of small, inefficient operations into a few large, highly efficient ones.
The current implementation uses a simple in-memory channel in the Go gateway for queuing, which is a single point of failure and doesn’t persist requests if the gateway crashes. A production-grade system would replace this with a durable message broker like Kafka or RabbitMQ, providing better resilience, load leveling, and backpressure capabilities. Furthermore, the error handling for batch processing could be more granular. A failure in a single image currently doesn’t prevent the rest of the batch from being indexed, but the client is not notified of partial successes. Implementing a more sophisticated callback mechanism or a separate status-checking endpoint would be a necessary next step. Finally, the simulated vector generation is a major simplification; integrating a real ML model would introduce significant complexity around model loading, versioning, and potential GPU utilization within the Kotlin worker.