The requirement seemed simple: add a new data analysis feature to our primary Kotlin-based application. Users needed to upload small CSV datasets and perform a series of non-trivial statistical computations—rolling window calculations, outlier detection, and time-series decomposition. The problem was that our entire stack was JVM-based. While libraries for this exist in the Java/Kotlin ecosystem, the de facto standard for this kind of work lives in Python with Pandas, NumPy, and SciPy. The sheer velocity and expressiveness of a Python-based solution were too compelling to ignore.
Our initial, naive approach was to stand up a simple Flask REST API for the Python component. The Ktor backend would make an HTTP call to it. This idea was discarded within an hour. In our production environment, services are ephemeral, containerized, and deployed across a dynamic set of hosts. Hardcoding IPs and ports was a non-starter. Service discovery was the first hurdle. The second, more critical one, was security. Exposing an internal service via unencrypted HTTP, even within a VPC, is a practice we’ve worked hard to eliminate. It creates a soft internal network that’s vulnerable to lateral movement. This seemingly small feature was forcing us to confront the complexities of building a secure, resilient, polyglot microservice architecture.
Our revised concept settled on a more robust architecture. The communication protocol between the Ktor backend and the new Python analytics service would be gRPC. This provides performance benefits over JSON/REST and, more importantly, a strongly-typed contract via Protocol Buffers, which is invaluable in a polyglot setup. For service discovery and security, we decided to leverage a service mesh. Instead of baking discovery and mTLS logic into each application, we would offload it to sidecar proxies managed by HashiCorp’s Consul. Consul Connect would provide automatic mTLS between registered services, ensuring all traffic is encrypted and authenticated, and it would handle dynamic service discovery transparently. The Ktor service wouldn’t need to know the IP address of the Python service; it would simply talk to its local sidecar, and Consul would route the request securely. The frontend would remain a Nuxt.js single-page application, communicating with the Ktor service, which now acts as a Backend-for-Frontend (BFF) and an orchestrator.
The Service Contract: Defining Communication with Protocol Buffers
The first step in any multi-service architecture is to define the contract. With gRPC, this is done using a .proto
file. This contract is the source of truth for both the client (Ktor) and the server (Python). A common pitfall is to design overly complex messages. In a real-world project, it’s better to start simple and explicit. We need to send raw CSV data and specify which analysis to perform. The service should return the processed data, likely as a JSON string for easy rendering on the frontend.
proto/analytics.proto
:
syntax = "proto3";
package analytics;
// The service definition for our Python-based data processor.
service PandasAnalytics {
// Executes a predefined analysis on the provided CSV data.
rpc Analyze (AnalysisRequest) returns (AnalysisResponse) {}
}
// The request message contains the raw CSV data as a string
// and an identifier for the type of analysis to run.
message AnalysisRequest {
string csv_data = 1;
AnalysisType analysis_type = 2;
}
// The response contains the result, typically a JSON representation
// of the resulting DataFrame, or an error message.
message AnalysisResponse {
string result_json = 1;
bool success = 2;
string error_message = 3;
}
// Enum to define the types of analysis we support.
// This allows us to extend functionality without changing the RPC method signature.
enum AnalysisType {
UNKNOWN = 0;
ROLLING_AVERAGE_7_DAY = 1;
OUTLIER_DETECTION_IQR = 2;
}
This contract is clear. The AnalysisRequest
carries the raw data and an enum indicating the desired operation. The AnalysisResponse
returns a JSON payload. This design decouples the heavy data format (Pandas DataFrame) from the transport layer, making it easy for any client to consume.
The Python Analytics Service: Wrapping Pandas in gRPC
With the contract defined, we build the Python service. This service is a gRPC server that listens for requests, deserializes the data, uses Pandas to perform the computation, and sends back the result.
The core of the implementation is handling the business logic. A common mistake is to mix transport logic with analysis logic. We’ll separate them into a handler class.
analytics_service/server.py
:
import grpc
import pandas as pd
import io
import json
from concurrent import futures
import logging
import time
# Import generated gRPC files
import analytics_pb2
import analytics_pb2_grpc
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AnalysisHandler:
"""
Contains the core data processing logic using Pandas.
This separation makes the code testable independent of gRPC.
"""
def _load_dataframe(self, csv_data: str) -> pd.DataFrame:
"""Loads a CSV string into a Pandas DataFrame."""
if not csv_data:
raise ValueError("Input CSV data cannot be empty.")
# Using io.StringIO to treat the string as a file
df = pd.read_csv(io.StringIO(csv_data))
if 'date' not in df.columns:
raise ValueError("CSV must contain a 'date' column.")
# Ensure date column is parsed correctly, a common production issue.
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
return df
def perform_rolling_average(self, df: pd.DataFrame) -> str:
"""Calculates a 7-day rolling average on the 'value' column."""
if 'value' not in df.columns:
raise ValueError("CSV must contain a 'value' column for rolling average.")
df['rolling_avg_7d'] = df['value'].rolling(window=7).mean()
# fillna is important to avoid JSON serialization issues with NaN
return df.fillna(0).to_json(orient='split', date_format='iso')
def perform_outlier_detection(self, df: pd.DataFrame) -> str:
"""Detects outliers using the Interquartile Range (IQR) method."""
if 'value' not in df.columns:
raise ValueError("CSV must contain a 'value' column for outlier detection.")
Q1 = df['value'].quantile(0.25)
Q3 = df['value'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
df['is_outlier'] = (df['value'] < lower_bound) | (df['value'] > upper_bound)
return df.to_json(orient='split', date_format='iso')
class PandasAnalyticsServicer(analytics_pb2_grpc.PandasAnalyticsServicer):
"""
The gRPC servicer class that ties the protocol to the handler.
"""
def __init__(self):
self.handler = AnalysisHandler()
def Analyze(self, request, context):
"""The main RPC endpoint implementation."""
try:
logging.info(f"Received analysis request for type: {request.analysis_type}")
df = self.handler._load_dataframe(request.csv_data)
result_json = ""
if request.analysis_type == analytics_pb2.AnalysisType.ROLLING_AVERAGE_7_DAY:
result_json = self.handler.perform_rolling_average(df)
elif request.analysis_type == analytics_pb2.AnalysisType.OUTLIER_DETECTION_IQR:
result_json = self.handler.perform_outlier_detection(df)
else:
raise ValueError(f"Unsupported analysis type: {request.analysis_type}")
return analytics_pb2.AnalysisResponse(
success=True,
result_json=result_json
)
except Exception as e:
# In a production system, you must not leak internal exceptions.
# Log the full error and return a generic message to the client.
logging.error(f"Analysis failed: {e}", exc_info=True)
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"An internal error occurred: {e}")
return analytics_pb2.AnalysisResponse(
success=False,
error_message=str(e)
)
def serve():
"""Starts the gRPC server."""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
analytics_pb2_grpc.add_PandasAnalyticsServicer_to_server(PandasAnalyticsServicer(), server)
# The server listens on port 50051. The Consul sidecar will connect to this.
server.add_insecure_port('[::]:50051')
logging.info("Starting Python Analytics gRPC server on port 50051...")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
serve()
To containerize this, we need a Dockerfile
. It must install Python dependencies and the generated gRPC code.
analytics_service/Dockerfile
:
# Use a slim Python base image for a smaller footprint.
FROM python:3.9-slim
WORKDIR /app
# Install dependencies first to leverage Docker layer caching.
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code.
COPY . .
# Generate gRPC code from the .proto file.
# This step is crucial and often missed in simple examples.
RUN python -m grpc_tools.protoc -I./proto --python_out=. --grpc_python_out=. ./proto/analytics.proto
# The command to run the gRPC server.
CMD ["python", "server.py"]
And the requirements.txt
:
grpcio
grpcio-tools
pandas
Consul Configuration: Weaving the Mesh
Now we configure Consul to manage these services. This involves creating service definition files that tell the local Consul agent about each service, what port it runs on, and—most importantly—that it wants to participate in the Connect service mesh.
For the Python analytics service:consul/analytics-service.json
:
{
"service": {
"name": "analytics-service",
"port": 50051,
"connect": {
"sidecar_service": {}
},
"checks": [
{
"grpc": "127.0.0.1:50051",
"interval": "10s",
"timeout": "1s"
}
]
}
}
This file defines a service named analytics-service
running on port 50051. The connect
stanza is key: it tells Consul to inject a sidecar proxy for this service. The health check uses gRPC, which is more reliable than a simple TCP check for verifying service health.
Similarly, for the Ktor API service:consul/api-service.json
:
{
"service": {
"name": "api-gateway",
"port": 8080,
"connect": {
"sidecar_service": {
"proxy": {
"upstreams": [
{
"destination_name": "analytics-service",
"local_bind_port": 20000
}
]
}
}
},
"checks": [
{
"http": "http://127.0.0.1:8080/health",
"interval": "10s"
}
]
}
}
The configuration for api-gateway
is more interesting. It also has a sidecar, but it defines an upstream
. This tells its local sidecar proxy: “Any service that wants to talk to analytics-service
should connect to me on my local port 20000
.” The proxy will then handle discovering the real analytics-service
, establishing an mTLS connection, and forwarding the traffic. This is the magic of the service mesh: our Ktor application will connect to localhost:20000
as if the Python service were running locally.
The Ktor API Gateway: Orchestration and Client Logic
The Ktor application acts as the bridge. It exposes a RESTful endpoint to the Nuxt.js frontend and communicates with the Python service via gRPC.
First, the build.gradle.kts
needs the correct dependencies for Ktor, gRPC, and Protobuf generation.
// build.gradle.kts (partial)
import com.google.protobuf.gradle.*
plugins {
// ...
id("com.google.protobuf") version "0.9.3"
}
// ...
dependencies {
// Ktor
implementation("io.ktor:ktor-server-core:$ktor_version")
implementation("io.ktor:ktor-server-netty:$ktor_version")
implementation("io.ktor:ktor-server-content-negotiation:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
implementation("io.ktor:ktor-server-cors:$ktor_version")
// gRPC
implementation("io.grpc:grpc-kotlin-stub:1.3.0")
implementation("io.grpc:grpc-protobuf:1.50.2")
implementation("io.grpc:grpc-netty:1.50.2")
// Logging
implementation("ch.qos.logback:logback-classic:$logback_version")
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:3.21.9" }
plugins {
id("grpc") { artifact = "io.grpc:protoc-gen-grpc-java:1.50.2" }
id("grpckt") { artifact = "io.grpc:protoc-gen-grpc-kotlin:1.3.0:jdk8@jar" }
}
generateProtoTasks {
all().forEach {
it.plugins {
id("grpc")
id("grpckt")
}
it.builtins {
id("kotlin")
}
}
}
}
Next, we implement the gRPC client and the Ktor routing.api_gateway/src/main/kotlin/com/example/Application.kt
:
package com.example
import analytics.Analytics
import analytics.PandasAnalyticsGrpcKt
import io.grpc.ManagedChannelBuilder
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.serialization.Serializable
import org.slf4j.LoggerFactory
fun main() {
embeddedServer(Netty, port = 8080, host = "0.0.0.0", module = Application::module).start(wait = true)
}
// A singleton gRPC client. Creating channels is expensive.
// In a real-world project, this would be managed by a dependency injection framework.
object AnalyticsClient {
private val logger = LoggerFactory.getLogger(javaClass)
// The host and port point to the LOCAL Consul Connect proxy, not the remote service.
// This is the most critical configuration detail.
private const val PROXY_HOST = "localhost"
private const val PROXY_PORT = 20000
private val channel = ManagedChannelBuilder.forAddress(PROXY_HOST, PROXY_PORT)
.usePlaintext() // The proxy handles TLS, so communication from app to proxy is plaintext.
.build()
private val stub = PandasAnalyticsGrpcKt.PandasAnalyticsCoroutineStub(channel)
init {
logger.info("AnalyticsClient initialized, connecting to gRPC proxy at $PROXY_HOST:$PROXY_PORT")
}
suspend fun analyze(csvData: String, type: Analytics.AnalysisType): Analytics.AnalysisResponse {
val request = Analytics.AnalysisRequest.newBuilder()
.setCsvData(csvData)
.setAnalysisType(type)
.build()
logger.info("Sending gRPC request to analytics-service...")
// Using a coroutine stub for non-blocking calls.
return stub.analyze(request)
}
}
@Serializable
data class ApiAnalysisRequest(val csvData: String, val analysisType: String)
@Serializable
data class ApiAnalysisResponse(val success: Boolean, val data: String?, val error: String?)
fun Application.module() {
val logger = LoggerFactory.getLogger(javaClass)
install(ContentNegotiation) {
json()
}
install(CORS) {
anyHost()
allowHeader("Content-Type")
}
routing {
get("/health") {
call.respondText("OK")
}
post("/analyze") {
try {
val request = call.receive<ApiAnalysisRequest>()
val analysisTypeEnum = when(request.analysisType) {
"ROLLING_AVERAGE" -> Analytics.AnalysisType.ROLLING_AVERAGE_7_DAY
"OUTLIER_DETECTION" -> Analytics.AnalysisType.OUTLIER_DETECTION_IQR
else -> Analytics.AnalysisType.UNRECOGNIZED
}
if (analysisTypeEnum == Analytics.AnalysisType.UNRECOGNIZED) {
call.respond(ApiAnalysisResponse(success = false, data = null, error = "Invalid analysis type"))
return@post
}
// Offload the blocking gRPC call to an I/O dispatcher.
val grpcResponse = withContext(Dispatchers.IO) {
AnalyticsClient.analyze(request.csvData, analysisTypeEnum)
}
if (grpcResponse.success) {
call.respond(ApiAnalysisResponse(success = true, data = grpcResponse.resultJson, error = null))
} else {
logger.warn("Analysis service returned an error: ${grpcResponse.errorMessage}")
call.respond(ApiAnalysisResponse(success = false, data = null, error = grpcResponse.errorMessage))
}
} catch (e: Exception) {
logger.error("Error in /analyze endpoint: ${e.message}", e)
call.respond(ApiAnalysisResponse(success = false, data = null, error = "Internal server error"))
}
}
}
}
The key piece of code here is the AnalyticsClient
. It connects to localhost:20000
, the port defined in consul/api-service.json
for the upstream dependency. It also specifies .usePlaintext()
because the communication from the Ktor app to its local sidecar does not need to be encrypted; the sidecar proxy itself terminates the local connection and initiates the secure mTLS tunnel to the downstream service’s sidecar. This is a common point of confusion when first using a service mesh.
The Nuxt.js Frontend: User Interaction
The frontend is a standard Nuxt.js application. Its only job is to provide a user interface to send data to the Ktor backend and display the result.
frontend/pages/index.vue
:
<template>
<div class="container">
<h1>Polyglot Data Analysis Service</h1>
<p>
Powered by Nuxt.js, Ktor, Python/Pandas, and Consul Connect
</p>
<div class="form-group">
<label for="analysisType">Analysis Type:</label>
<select v-model="analysisType" id="analysisType">
<option value="ROLLING_AVERAGE">7-Day Rolling Average</option>
<option value="OUTLIER_DETECTION">Outlier Detection (IQR)</option>
</select>
</div>
<div class="form-group">
<label for="csvData">CSV Data:</label>
<textarea v-model="csvData" id="csvData" rows="10" placeholder="date,value 2023-01-01,10 2023-01-02,12..."></textarea>
</div>
<button @click="submitAnalysis" :disabled="loading">
{{ loading ? 'Analyzing...' : 'Analyze' }}
</button>
<div v-if="error" class="error">
<strong>Error:</strong> {{ error }}
</div>
<div v-if="result" class="result">
<h2>Analysis Result</h2>
<pre><code>{{ formattedResult }}</code></pre>
</div>
</div>
</template>
<script setup>
import { ref, computed } from 'vue';
const analysisType = ref('ROLLING_AVERAGE');
const csvData = ref(`date,value
2023-01-01,10
2023-01-02,12
2023-01-03,11
2023-01-04,15
2023-01-05,25
2023-01-06,23
2023-01-07,22
2023-01-08,24
2023-01-09,100
2023-01-10,28
2023-01-11,30`);
const loading = ref(false);
const error = ref(null);
const result = ref(null);
const formattedResult = computed(() => {
if (!result.value) return '';
try {
return JSON.stringify(JSON.parse(result.value), null, 2);
} catch (e) {
return result.value;
}
});
async function submitAnalysis() {
loading.value = true;
error.value = null;
result.value = null;
try {
const response = await fetch('http://localhost:8080/analyze', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
csvData: csvData.value,
analysisType: analysisType.value
})
});
const responseData = await response.json();
if (!responseData.success) {
throw new Error(responseData.error || 'Analysis failed on the server.');
}
result.value = responseData.data;
} catch (e) {
error.value = e.message;
} finally {
loading.value = false;
}
}
</script>
<style>
/* Basic styling for readability */
.container { max-width: 800px; margin: 2rem auto; font-family: sans-serif; }
.form-group { margin-bottom: 1rem; }
label { display: block; margin-bottom: 0.5rem; }
textarea, select { width: 100%; padding: 0.5rem; font-size: 1rem; }
button { padding: 0.75rem 1.5rem; font-size: 1rem; cursor: pointer; }
.error { margin-top: 1rem; color: red; border: 1px solid red; padding: 1rem; }
.result { margin-top: 1rem; background-color: #f0f0f0; padding: 1rem; }
pre { white-space: pre-wrap; word-wrap: break-word; }
</style>
Orchestrating the Full Stack with Docker Compose
To run this entire stack locally for development, a docker-compose.yml
is indispensable. It will define the Consul server, our two services, and their corresponding sidecars. The trick is how to run the consul connect proxy
command alongside the main application process. A simple shell script or using the command
override in Docker Compose is a common pattern.
docker-compose.yml
:
version: '3.8'
services:
consul-server:
image: hashicorp/consul:1.13
container_name: consul-server
ports:
- "8500:8500"
- "8600:8600/udp"
command: "agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0"
analytics-service:
build: ./analytics_service
container_name: analytics-service
depends_on:
- consul-server
# This command is complex. It registers the service with Consul and then starts the sidecar proxy
# in the background before executing the main application.
command: >
sh -c "
apk add --no-cache curl &&
sleep 10 &&
curl --request PUT --data @/consul/analytics-service.json http://consul-server:8500/v1/agent/service/register &&
consul connect proxy -sidecar-for analytics-service -http-addr=http://consul-server:8500 &
exec python server.py
"
volumes:
- ./consul:/consul
api-gateway:
build: ./api_gateway
container_name: api-gateway
ports:
- "8080:8080"
depends_on:
- consul-server
command: >
sh -c "
apk add --no-cache curl &&
sleep 10 &&
curl --request PUT --data @/consul/api-service.json http://consul-server:8500/v1/agent/service/register &&
consul connect proxy -sidecar-for api-gateway -http-addr=http://consul-server:8500 &
exec java -jar /app/build/libs/api_gateway-0.0.1-all.jar
"
volumes:
- ./consul:/consul
# Note: The Nuxt.js frontend is typically run locally during development via `npm run dev`
# and would not be part of this backend docker-compose setup.
A pitfall in the docker-compose
setup is process management. The exec
command is crucial; it replaces the shell process with the application process, ensuring that signals like SIGTERM
are passed correctly when stopping the container. The initial sleep
is a pragmatic, albeit crude, way to ensure the Consul server is ready before services try to register. In production, a more robust solution like consul-template
or an init container would be used.
The final request flow is now fully realized and secured by the mesh:
sequenceDiagram participant User participant Nuxt.js Frontend participant Ktor API Gateway participant Ktor Sidecar participant Analytics Sidecar participant Python Analytics Service User->>Nuxt.js Frontend: Enters CSV and clicks "Analyze" Nuxt.js Frontend->>Ktor API Gateway: POST /analyze (HTTP) Ktor API Gateway->>Ktor Sidecar: gRPC call to localhost:20000 Ktor Sidecar->>Analytics Sidecar: Forward gRPC call (mTLS tunnel) Analytics Sidecar->>Python Analytics Service: Forward gRPC call to localhost:50051 Python Analytics Service-->>Analytics Sidecar: gRPC response Analytics Sidecar-->>Ktor Sidecar: gRPC response (mTLS tunnel) Ktor Sidecar-->>Ktor API Gateway: gRPC response Ktor API Gateway-->>Nuxt.js Frontend: HTTP Response with JSON Nuxt.js Frontend-->>User: Display formatted result
This architecture, while more complex to set up initially, pays significant dividends in a production environment. It provides secure-by-default communication, decouples services from network topology, and allows teams to use the best tool for the job—in this case, Pandas for data science and Ktor for scalable backend services—without compromising on security or operational sanity.
The current implementation, however, has its limits. The entire process is synchronous. A very large CSV file or a computationally intensive analysis could block the Ktor request thread and time out the user’s HTTP request. A more resilient evolution would involve an asynchronous job queue. The Ktor endpoint would accept the request, push a job to a message broker like RabbitMQ or Kafka, and immediately return a jobId
. The Python service would become a worker, consuming jobs from the queue. The Nuxt.js frontend would then poll an endpoint or use a WebSocket to get the status and result of the job, providing a much better user experience for long-running tasks. Furthermore, while mTLS secures the transport layer, it does not provide application-level authorization. Consul’s intentions can be used to define which services are allowed to talk to each other, adding another layer of defense-in-depth, which would be a logical next step in hardening this system.