Implementing a Polyglot Image Vectoring Pipeline with Go-Fiber, Kotlin, and Qdrant


The initial system was a straightforward monolithic service responsible for handling image uploads, processing them, and indexing for similarity search. This design failed spectacularly once traffic exceeded a few dozen concurrent requests. Each incoming HTTP request would lock a thread, perform file I/O, execute CPU-intensive OpenCV operations, and then make a network call to the vector database. Latency skyrocketed, the CPU thrashed, and the service became a bottleneck. The core technical pain point was the tight coupling of low-latency network I/O with high-latency, unpredictable computational work.

A redesign was necessary, centered on decoupling and asynchronous batch processing. The concept was to split the system into two specialized microservices: a lightweight, high-performance ingress gateway and a robust, scalable processing worker. The gateway would be responsible only for accepting requests at high velocity and queuing them. The worker would consume these tasks in efficient batches, maximizing resource utilization. This polyglot approach allows for using the best tool for each job.

For the ingress gateway, Go with the Fiber framework was chosen. Its performance, low memory footprint, and concurrency model based on goroutines make it ideal for handling thousands of simultaneous, short-lived network connections without breaking a sweat. Its job is simple: accept an image, validate it, and pass it on.

For the processing worker, Kotlin on the JVM was the pragmatic choice. While the JVM has a higher memory overhead, its ecosystem is unparalleled for this kind of task. Mature and stable libraries for OpenCV, a feature-rich official gRPC client for Qdrant, and powerful concurrency primitives make it a solid foundation for complex, long-running computations. Containerizing this service would be handled by Jib, completely eliminating the need for Dockerfiles and the associated maintenance overhead, while producing optimized, reproducible container images directly from the build tool.

Qdrant was retained as the vector database due to its performance and, critically for this new architecture, its first-class support for batch upsert operations, which is the cornerstone of the entire optimization strategy. Communication between the Go gateway and the Kotlin worker would be handled via gRPC for a strongly-typed, efficient, and language-agnostic contract.

The final architecture looks like this:

sequenceDiagram
    participant Client
    participant Go-Fiber Gateway
    participant Kotlin-OpenCV Worker
    participant Qdrant DB

    Client->>+Go-Fiber Gateway: POST /upload (Image Data)
    Go-Fiber Gateway-->>-Client: 202 Accepted (Request Queued)
    Go-Fiber Gateway->>+Kotlin-OpenCV Worker: gRPC ProcessImages (Batch of Images)
    Kotlin-OpenCV Worker-->>-Go-Fiber Gateway: gRPC Ack
    Note right of Kotlin-OpenCV Worker: 1. Decode Image Bytes
2. OpenCV Preprocessing
3. Generate Vector Embedding Kotlin-OpenCV Worker->>+Qdrant DB: Batch Upsert Points (Vectors) Qdrant DB-->>-Kotlin-OpenCV Worker: Upsert Confirmation

The Go-Fiber Ingress Gateway

The Go service is intentionally minimal. It exposes a single endpoint for multipart form data uploads. A crucial design decision here is that it does not process images synchronously. Instead, it places them into an in-memory buffered channel, which acts as a temporary queue. A background goroutine collects items from this channel into batches, either when a batch reaches a certain size or a timeout is exceeded. This prevents a single slow downstream call from blocking all incoming requests.

First, the gRPC contract must be defined in a proto file. This is the source of truth for communication between the two services.

ingestion.proto:

syntax = "proto3";

package ingestion;

option go_package = "ingestion/ingestionpb";
option java_multiple_files = true;
option java_package = "io.system_architect.ingestion.grpc";
option java_outer_classname = "IngestionProto";

service IngestionService {
  rpc ProcessImages(ProcessImagesRequest) returns (ProcessImagesResponse);
}

message ImagePayload {
  string client_assigned_id = 1;
  bytes image_data = 2;
  string content_type = 3;
}

message ProcessImagesRequest {
  repeated ImagePayload images = 1;
}

message IngestionStatus {
  string client_assigned_id = 1;
  bool success = 2;
  string message = 3;
}

message ProcessImagesResponse {
  repeated IngestionStatus statuses = 1;
}

With the contract defined, we generate the Go code using protoc.

The core of the Go service is the batching mechanism.

main.go:

package main

import (
	"context"
	"fmt"
	"ingestion/ingestionpb"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

const (
	maxBatchSize    = 64
	maxBatchTimeout = 2 * time.Second
)

type imageRequest struct {
	id          string
	data        []byte
	contentType string
}

var (
	requestChannel chan imageRequest
	grpcClient     ingestionpb.IngestionServiceClient
)

func main() {
	// A buffered channel to act as our in-memory queue.
	// In a real-world project, this would be replaced by Kafka or RabbitMQ.
	requestChannel = make(chan imageRequest, 1024)

	// Setup gRPC client connection to the Kotlin worker
	conn, err := grpc.Dial("kotlin-worker:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect to gRPC server: %v", err)
	}
	defer conn.Close()
	grpcClient = ingestionpb.NewIngestionServiceClient(conn)

	// Start the background worker goroutine for batching
	var wg sync.WaitGroup
	wg.Add(1)
	go batchProcessor(&wg)

	// Setup Fiber web server
	app := fiber.New(fiber.Config{
		BodyLimit: 10 * 1024 * 1024, // 10MB file limit
	})

	app.Post("/upload", handleUpload)

	go func() {
		if err := app.Listen(":3000"); err != nil {
			log.Fatalf("Fiber server failed to start: %v", err)
		}
	}()

	// Graceful shutdown
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	log.Println("Shutting down server...")
	close(requestChannel) // Signal the batch processor to finish
	wg.Wait()             // Wait for the processor to flush remaining items
	if err := app.Shutdown(); err != nil {
		log.Fatalf("Server shutdown failed: %v", err)
	}
	log.Println("Server gracefully stopped")
}

func handleUpload(c *fiber.Ctx) error {
	file, err := c.FormFile("image")
	if err != nil {
		return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "image part is missing"})
	}

	fileBuffer, err := file.Open()
	if err != nil {
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "cannot open file"})
	}
	defer fileBuffer.Close()

	buf := make([]byte, file.Size)
	_, err = fileBuffer.Read(buf)
	if err != nil {
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "cannot read file into buffer"})
	}

	req := imageRequest{
		id:          uuid.New().String(),
		data:        buf,
		contentType: file.Header.Get("Content-Type"),
	}

	// Non-blocking send to the channel.
	// If the channel is full, we immediately reject the request.
	// This is a form of backpressure.
	select {
	case requestChannel <- req:
		return c.Status(fiber.StatusAccepted).JSON(fiber.Map{"id": req.id, "status": "queued"})
	default:
		log.Println("Request channel is full. Rejecting request.")
		return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{"error": "server is busy, please try again later"})
	}
}

// This is the core logic for batch processing.
func batchProcessor(wg *sync.WaitGroup) {
	defer wg.Done()
	batch := make([]imageRequest, 0, maxBatchSize)
	ticker := time.NewTicker(maxBatchTimeout)

	for {
		select {
		case req, ok := <-requestChannel:
			if !ok {
				// Channel closed, flush any remaining items and exit.
				if len(batch) > 0 {
					log.Printf("Channel closed. Flushing final batch of %d images.\n", len(batch))
					sendBatch(batch)
				}
				return
			}

			batch = append(batch, req)
			if len(batch) >= maxBatchSize {
				log.Printf("Batch size reached. Sending %d images.\n", len(batch))
				sendBatch(batch)
				batch = make([]imageRequest, 0, maxBatchSize)
				ticker.Reset(maxBatchTimeout) // Reset the timer after sending
			}
		case <-ticker.C:
			// Timeout reached, send whatever is in the batch.
			if len(batch) > 0 {
				log.Printf("Batch timeout. Sending %d images.\n", len(batch))
				sendBatch(batch)
				batch = make([]imageRequest, 0, maxBatchSize)
			}
		}
	}
}

func sendBatch(batch []imageRequest) {
	payloads := make([]*ingestionpb.ImagePayload, len(batch))
	for i, req := range batch {
		payloads[i] = &ingestionpb.ImagePayload{
			ClientAssignedId: req.id,
			ImageData:        req.data,
			ContentType:      req.contentType,
		}
	}

	request := &ingestionpb.ProcessImagesRequest{Images: payloads}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	_, err := grpcClient.ProcessImages(ctx, request)
	if err != nil {
		// A common mistake is to not handle gRPC errors properly.
		// This could indicate the worker is down or overwhelmed.
		log.Printf("Failed to send batch to Kotlin worker: %v\n", err)
		// Here you would add retry logic or push to a dead-letter queue.
	} else {
		log.Printf("Successfully sent batch of %d images to worker.\n", len(batch))
	}
}

This Go service is now a highly efficient, non-blocking shock absorber for the system.

The Kotlin Processing Worker

The Kotlin worker is where the heavy lifting happens. It needs to implement the gRPC service, manage OpenCV native libraries, process images in parallel, and perform batch inserts into Qdrant.

The project is set up with Gradle. The build.gradle.kts file is critical, especially for managing gRPC code generation and packaging with Jib.

build.gradle.kts:

import com.google.protobuf.gradle.*

plugins {
    kotlin("jvm") version "1.9.20"
    id("com.google.protobuf") version "0.9.4"
    id("com.github.johnrengelman.shadow") version "8.1.1"
    id("com.google.cloud.tools.jib") version "3.4.0"
}

// ... repositories and dependencies ...

dependencies {
    // Kotlin & Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    // gRPC
    implementation("io.grpc:grpc-netty-shaded:1.59.0")
    implementation("io.grpc:grpc-protobuf:1.59.0")
    implementation("io.grpc:grpc-stub:1.59.0")
    implementation(kotlin("stdlib-jdk8"))
    compileOnly("org.apache.tomcat:annotations-api:6.0.53")

    // Qdrant Client
    implementation("io.qdrant:client:1.7.0")

    // OpenCV
    implementation("org.openpnp:opencv:4.8.1-0")

    // Logging
    implementation("io.github.microutils:kotlin-logging-jvm:3.0.5")
    implementation("ch.qos.logback:logback-classic:1.4.11")
}

protobuf {
    protoc { artifact = "com.google.protobuf:protoc:3.24.4" }
    plugins {
        id("grpc") { artifact = "io.grpc:protoc-gen-grpc-java:1.59.0" }
        id("grpckt") { artifact = "io.grpc:protoc-gen-grpc-kotlin:1.4.1" }
    }
    generateProtoTasks {
        ofSourceSet("main").forEach {
            it.plugins {
                id("grpc") {}
                id("grpckt") {}
            }
            it.builtins {
                id("kotlin") {}
            }
        }
    }
}

// Jib configuration
jib {
    from.image = "eclipse-temurin:17-jre-focal"
    to.image = "polyglot-worker:latest"
    container.mainClass = "io.system_architect.worker.WorkerServerKt"
    // A CRITICAL step for OpenCV: copy native libraries into the container's library path
    extraDirectories.paths.add(file("src/main/resources/natives"))
    container.jvmFlags = listOf("-Djava.library.path=/app/resources")
}

A significant pitfall in using OpenCV on the JVM inside a container is managing its native libraries (.so files on Linux). Jib makes this manageable. The native libraries must be extracted from the OpenCV JAR and placed in a directory (src/main/resources/natives here). The jib configuration then copies this directory into the container and, crucially, adds it to the java.library.path via a JVM flag. Without this, the application will crash with an UnsatisfiedLinkError at runtime.

The main worker server implementation uses Kotlin Coroutines to process the images within a batch concurrently, maximizing CPU usage.

WorkerServer.kt:

package io.system_architect.worker

import io.github.oshai.kotlinlogging.KotlinLogging
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.qdrant.client.QdrantClient
import io.qdrant.client.QdrantGrpcClient
import io.qdrant.client.grpc.Points.*
import kotlinx.coroutines.runBlocking
import nu.pattern.OpenCV
import java.util.concurrent.TimeUnit

private val logger = KotlinLogging.logger {}

class WorkerServer(private val port: Int) {
    private val qdrantClient: QdrantClient
    val server: Server

    init {
        // Load OpenCV native library
        try {
            OpenCV.loadLocally()
            logger.info { "OpenCV native library loaded successfully." }
        } catch (e: Exception) {
            logger.error(e) { "FATAL: Failed to load OpenCV native library." }
            throw e
        }
        
        // Initialize Qdrant client
        val qdrantHost = System.getenv("QDRANT_HOST") ?: "localhost"
        qdrantClient = QdrantClient(
            QdrantGrpcClient.newBuilder(qdrantHost, 6334, false).build()
        )

        // Ensure collection exists
        runBlocking {
            val collectionName = "visual_search_vectors"
            val collections = qdrantClient.listCollectionsAsync().await()
            if (!collections.contains(collectionName)) {
                logger.info { "Collection '$collectionName' not found. Creating..." }
                qdrantClient.createCollectionAsync(
                    collectionName,
                    VectorParams.newBuilder().setDistance(Distance.Cosine).setSize(512).build()
                ).await()
            }
        }
        
        server = NettyServerBuilder.forPort(port)
            .addService(IngestionServiceImpl(qdrantClient))
            .build()
    }

    fun start() {
        server.start()
        logger.info { "Server started, listening on $port" }
        Runtime.getRuntime().addShutdownHook(Thread {
            logger.info { "*** shutting down gRPC server since JVM is shutting down" }
            this@WorkerServer.stop()
            logger.info { "*** server shut down" }
        })
    }

    private fun stop() {
        qdrantClient.close()
        server.shutdown().awaitTermination(30, TimeUnit.SECONDS)
    }

    fun blockUntilShutdown() {
        server.awaitTermination()
    }
}

fun main() {
    val port = 50051
    val server = WorkerServer(port)
    server.start()
    server.blockUntilShutdown()
}

The gRPC service implementation is where the core logic resides. It receives a batch of images, processes them, and then performs a single bulk upsert to Qdrant.

IngestionServiceImpl.kt:

package io.system_architect.worker

import com.google.protobuf.ByteString
import io.github.oshai.kotlinlogging.KotlinLogging
import io.qdrant.client.QdrantClient
import io.qdrant.client.PointIdFactory
import io.qdrant.client.PointStructFactory
import io.qdrant.client.grpc.Points.UpdateStatus
import io.system_architect.ingestion.grpc.ImagePayload
import io.system_architect.ingestion.grpc.IngestionServiceGrpcKt
import io.system_architect.ingestion.grpc.ProcessImagesRequest
import io.system_architect.ingestion.grpc.ProcessImagesResponse
import io.system_architect.ingestion.grpc.ingestionStatus
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import org.opencv.core.Mat
import org.opencv.core.MatOfByte
import org.opencv.core.Size
import org.opencv.imgcodecs.Imgcodecs
import org.opencv.imgproc.Imgproc
import java.util.UUID

private val logger = KotlinLogging.logger {}
const val COLLECTION_NAME = "visual_search_vectors"
const val VECTOR_DIMENSION = 512

class IngestionServiceImpl(private val qdrantClient: QdrantClient) : IngestionServiceGrpcKt.IngestionServiceCoroutineImplBase() {

    override suspend fun processImages(request: ProcessImagesRequest): ProcessImagesResponse {
        val imageCount = request.imagesList.size
        logger.info { "Received a batch of $imageCount images to process." }

        if (imageCount == 0) {
            return ProcessImagesResponse.newBuilder().build()
        }

        val processingResults = coroutineScope {
            request.imagesList.map { payload ->
                async(Dispatchers.Default) { // Use Default dispatcher for CPU-bound work
                    processSingleImage(payload)
                }
            }.awaitAll()
        }

        val validPoints = processingResults.filterIsInstance<ImageProcessingResult.Success>()
        val failedResults = processingResults.filterIsInstance<ImageProcessingResult.Failure>()

        if (validPoints.isNotEmpty()) {
            val pointStructs = validPoints.map {
                PointStructFactory.newPointStruct(
                    PointIdFactory.id(UUID.fromString(it.id)),
                    it.vector,
                    // In a real project, add metadata payload here
                )
            }

            try {
                // The most critical optimization: batch upsert to Qdrant
                val updateResult = qdrantClient.upsertPointsAsync(COLLECTION_NAME, pointStructs, null, true).await()
                if (updateResult.status != UpdateStatus.Completed) {
                    logger.error { "Qdrant batch upsert failed with status: ${updateResult.status}" }
                    // Handle partial failure if needed
                }
            } catch (e: Exception) {
                logger.error(e) { "Exception during Qdrant batch upsert." }
                // Here, we fail the entire batch. A more robust implementation might retry.
            }
        }

        val statuses = validPoints.map { ingestionStatus { clientAssignedId = it.originalId; success = true } } +
                       failedResults.map { ingestionStatus { clientAssignedId = it.originalId; success = false; message = it.reason } }

        return ProcessImagesResponse.newBuilder().addAllStatuses(statuses).build()
    }

    private fun processSingleImage(payload: ImagePayload): ImageProcessingResult {
        return try {
            val mat = Imgcodecs.imdecode(MatOfByte(*payload.imageData.toByteArray()), Imgcodecs.IMREAD_COLOR)
            if (mat.empty()) {
                return ImageProcessingResult.Failure(payload.clientAssignedId, "Failed to decode image.")
            }

            // Simple OpenCV preprocessing: resize and convert to a standard format
            val resized = Mat()
            Imgproc.resize(mat, resized, Size(224.0, 224.0), 0.0, 0.0, Imgproc.INTER_AREA)

            // In a real project, this is where you'd call a deep learning model (e.g., ONNX, PyTorch)
            // to get a meaningful vector embedding. For this example, we simulate it.
            val vector = generateSimulatedVector(resized)
            
            // Release native memory managed by OpenCV
            mat.release()
            resized.release()

            ImageProcessingResult.Success(payload.clientAssignedId, UUID.randomUUID().toString(), vector)
        } catch (e: Exception) {
            logger.warn(e) { "Error processing image ID ${payload.clientAssignedId}" }
            ImageProcessingResult.Failure(payload.clientAssignedId, e.message ?: "Unknown processing error")
        }
    }

    private fun generateSimulatedVector(image: Mat): List<Float> {
        // This is a placeholder for a real ML model inference.
        // We generate a vector based on simple image properties to have a deterministic output.
        val vector = FloatArray(VECTOR_DIMENSION)
        val mean = org.opencv.core.Core.mean(image)
        vector[0] = mean.`val`[0].toFloat() / 255.0f // Blue channel
        vector[1] = mean.`val`[1].toFloat() / 255.0f // Green channel
        vector[2] = mean.`val`[2].toFloat() / 255.0f // Red channel
        // Fill the rest with some values to match dimension
        for (i in 3 until VECTOR_DIMENSION) {
            vector[i] = (vector[0] + vector[1] + vector[2]) / 3.0f * (i % 10) / 10.0f
        }
        return vector.toList()
    }
}

sealed class ImageProcessingResult {
    data class Success(val originalId: String, val id: String, val vector: List<Float>) : ImageProcessingResult()
    data class Failure(val originalId: String, val reason: String) : ImageProcessingResult()
}

The key takeaways from the Kotlin implementation are:

  1. Parallel Processing: async(Dispatchers.Default) is used to parallelize the CPU-bound OpenCV work across available cores for a single batch.
  2. Batch Database Operation: All successfully generated vectors are inserted into Qdrant with a single upsertPointsAsync call. A common mistake is to loop and insert one by one, which creates immense network and database overhead.
  3. Resource Management: Explicitly calling mat.release() is important for managing off-heap memory used by OpenCV’s native code.

Deployment and Execution

To run this polyglot system, Docker Compose provides a simple way to orchestrate the services.

docker-compose.yml:

version: '3.8'

services:
  qdrant:
    image: qdrant/qdrant:v1.7.4
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - ./qdrant_storage:/qdrant/storage

  go-gateway:
    build:
      context: ./go-gateway # Assuming Go code is in this directory
      dockerfile: Dockerfile
    ports:
      - "3000:3000"
    depends_on:
      - kotlin-worker
    environment:
      - KOTLIN_WORKER_ADDRESS=kotlin-worker:50051

  kotlin-worker:
    # This image is built by Jib, so we just reference it.
    # We would run './gradlew jibDockerBuild' in the kotlin-worker directory first.
    image: polyglot-worker:latest
    ports:
      - "50051:50051"
    depends_on:
      - qdrant
    environment:
      - QDRANT_HOST=qdrant

This architecture successfully decouples the system components, allowing each to scale independently and perform its task optimally. The Go gateway can handle a massive influx of requests, while the Kotlin workers can be scaled horizontally based on the processing queue depth, ensuring efficient use of expensive compute resources. The batching strategy transforms thousands of small, inefficient operations into a few large, highly efficient ones.

The current implementation uses a simple in-memory channel in the Go gateway for queuing, which is a single point of failure and doesn’t persist requests if the gateway crashes. A production-grade system would replace this with a durable message broker like Kafka or RabbitMQ, providing better resilience, load leveling, and backpressure capabilities. Furthermore, the error handling for batch processing could be more granular. A failure in a single image currently doesn’t prevent the rest of the batch from being indexed, but the client is not notified of partial successes. Implementing a more sophisticated callback mechanism or a separate status-checking endpoint would be a necessary next step. Finally, the simulated vector generation is a major simplification; integrating a real ML model would introduce significant complexity around model loading, versioning, and potential GPU utilization within the Kotlin worker.


  TOC