Implementing an Observable Synthetic Monitoring Pipeline Using Knative Kotlin Services and On-Demand Matplotlib Visualization


The core problem wasn’t a lack of monitoring; it was a lack of actionable insight. Our primary customer-facing API was showing intermittent performance degradation. Average latency metrics looked fine, but our p99 latency would occasionally spike, triggering vague alerts that would resolve themselves before an engineer could even begin investigating. We were flying blind, reacting to ghosts in the machine. The existing dashboards were useless for diagnosing these transient, high-percentile issues. We needed a way to proactively and consistently probe the API’s performance profile, not just its average health. This led to the decision to build an internal synthetic monitoring tool, but with a few stringent requirements born from past scars:

  1. Cost-Effective Operation: The checks needed to run every few minutes. A constantly running service for this task was an unjustifiable expense.
  2. Rich, Visual Output: A simple pass/fail or a single latency number was insufficient. We needed to understand the distribution of response times to spot multi-modal behavior (e.g., fast cache hits vs. slow cache misses).
  3. Deep Observability: If a synthetic test run was slow, we had to know precisely where the time was spent. Was it our test runner, the network, or the target API itself? The system would be distributed by nature, making this a non-trivial requirement.
  4. Fully Automated Lifecycle: The entire build, test, and deploy process for the monitor itself had to be a zero-touch pipeline.

This set of constraints guided our technology selection. For cost-effective, event-driven execution, Knative was the obvious choice due to its scale-to-zero capability. The main test orchestration logic would be written in Kotlin, our team’s standard for robust backend services. To generate the required latency distribution plots, we’d leverage Matplotlib in a separate Python service, a pragmatic choice for its powerful and mature visualization capabilities. To solve the observability puzzle in this ephemeral, multi-service environment, SkyWalking was selected for its distributed tracing prowess, especially with JVM-based applications. Finally, CircleCI would serve as the backbone for automating the entire CI/CD workflow. This post is a log of how these disparate technologies were integrated to solve our specific engineering problem.

The Technical Pain Point: Designing the Core Components

The architecture breaks down into three main parts: a scheduler, a Kotlin-based test runner, and a Python-based report generator.

graph TD
    subgraph Kubernetes Cluster
        A[Knative Eventing: CronJobSource] -- Triggers every 5 mins --> B(Knative Service: Kotlin Test Runner);
        B -- HTTP POST with timing data --> C(Knative Service: Python Plot Generator);
        B -- Makes N requests --> D[Target API];
    end

    subgraph Observability
        B -- Traces --> E[SkyWalking OAP];
        C -- Traces --> E;
        D -- Traces --> E;
    end

    subgraph CI/CD
        F[Git Push] --> G[CircleCI Pipeline];
        G -- Builds & Pushes --> H[Container Registry];
        G -- kubectl apply --> A;
        G -- kubectl apply --> B;
        G -- kubectl apply --> C;
    end

The flow is initiated by a Knative CronJobSource which sends a CloudEvent to the Kotlin test runner service. This service, now scaled up from zero, executes a barrage of requests against the target API, meticulously recording the latency of each call. Once complete, it packages this raw timing data into a JSON payload and makes an HTTP request to the Python service. The Python service uses Matplotlib to generate a latency histogram from the data, returning the image binary. The entire sequence, from the Kotlin service’s invocation to its call to the Python service, is traced by SkyWalking.

Step 1: The Kotlin Test Runner - Code and Configuration

The first component is the heart of the system. We used Ktor as the HTTP server and client framework for its lightweight nature and excellent coroutine support, which is ideal for firing off concurrent requests.

The build.gradle.kts file is the foundation, pulling in the necessary dependencies. Crucially, it includes the SkyWalking agent toolkit.

// build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    kotlin("jvm") version "1.9.20"
    id("com.github.johnrengelman.shadow") version "8.1.1"
    application
}

group = "com.synthetic.monitor"
version = "1.0.0"

repositories {
    mavenCentral()
}

dependencies {
    // Ktor for HTTP server/client
    implementation("io.ktor:ktor-server-core-jvm:2.3.5")
    implementation("io.ktor:ktor-server-cio-jvm:2.3.5")
    implementation("io.ktor:ktor-client-cio:2.3.5")
    implementation("io.ktor:ktor-client-content-negotiation:2.3.5")
    implementation("io.ktor:ktor-serialization-kotlinx-json:2.3.5")

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.11")

    // SkyWalking APM
    implementation("org.apache.skywalking:apm-toolkit-trace:8.15.0")
    
    // Testing
    testImplementation(kotlin("test"))
}

application {
    mainClass.set("com.synthetic.monitor.ApplicationKt")
}

tasks.withType<KotlinCompile> {
    kotlinOptions.jvmTarget = "17"
}

// Create a fat JAR for easier packaging in Docker
tasks.shadowJar {
    archiveBaseName.set("synthetic-runner")
    archiveClassifier.set("")
    archiveVersion.set("")
}

The core service logic receives a request (triggered by the cron source), runs the test suite, and then calls the visualization service.

// src/main/kotlin/com/synthetic/monitor/Application.kt

package com.synthetic.monitor

import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.cio.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.serialization.Serializable
import org.apache.skywalking.apm.toolkit.trace.TraceContext
import org.slf4j.LoggerFactory
import kotlin.system.measureTimeMillis

@Serializable
data class LatencyData(val latencies: List<Long>)

@Serializable
data class VisualizationRequest(val traceId: String, val timingData: List<Long>)

val logger = LoggerFactory.getLogger("SyntheticRunner")
val VISUALIZATION_SERVICE_URL = System.getenv("VISUALIZATION_SERVICE_URL") ?: "http://localhost:8081/generate-plot"
val TARGET_API_ENDPOINT = System.getenv("TARGET_API_ENDPOINT") ?: "http://example.com"
const val REQUEST_COUNT = 100

fun main() {
    embeddedServer(CIO, port = 8080, host = "0.0.0.0", module = Application::module).start(wait = true)
}

fun Application.module() {
    val httpClient = HttpClient(CIO) {
        install(io.ktor.client.plugins.contentnegotiation.ContentNegotiation) {
            json()
        }
    }

    routing {
        post("/") {
            val eventType = call.request.header("Ce-Type")
            logger.info("Received trigger event of type: $eventType. Starting synthetic test run.")
            
            try {
                val timings = executeTestRun(httpClient)
                logger.info("Test run completed. Collected ${timings.size} data points. P99: ${percentile(timings, 99.0)}ms")

                // A common mistake is to forget propagating context. SkyWalking needs this.
                val traceId = TraceContext.traceId()
                
                val response: HttpResponse = httpClient.post(VISUALIZATION_SERVICE_URL) {
                    contentType(ContentType.Application.Json)
                    setBody(VisualizationRequest(traceId, timings))
                }
                
                if (response.status.isSuccess()) {
                    logger.info("Successfully requested visualization. Trace ID: $traceId")
                    call.respond(HttpStatusCode.OK, "Test run successful, visualization requested.")
                } else {
                    logger.error("Failed to request visualization. Status: ${response.status}. Body: ${response.bodyAsText()}")
                    call.respond(HttpStatusCode.InternalServerError, "Visualization service failed.")
                }

            } catch (e: Exception) {
                logger.error("Synthetic test run failed.", e)
                call.respond(HttpStatusCode.InternalServerError, "Test run failed: ${e.message}")
            }
        }
    }
}

suspend fun executeTestRun(client: HttpClient): List<Long> = coroutineScope {
    val deferreds = (1..REQUEST_COUNT).map { i ->
        async {
            var latency: Long
            try {
                latency = measureTimeMillis {
                    // In a real project, add more complex validation of the response
                    client.get(TARGET_API_ENDPOINT) {
                        headers {
                            append("X-Request-ID", "synthetic-run-${System.currentTimeMillis()}-$i")
                        }
                    }
                }
            } catch (e: Exception) {
                logger.warn("Request $i to $TARGET_API_ENDPOINT failed: ${e.message}")
                latency = -1 // Use a sentinel value for failed requests
            }
            latency
        }
    }
    deferreds.awaitAll().filter { it != -1L }
}

fun percentile(latencies: List<Long>, percentile: Double): Long {
    if (latencies.isEmpty()) return 0
    val sorted = latencies.sorted()
    val index = (percentile / 100.0 * sorted.size).toInt()
    return sorted.getOrNull(index) ?: sorted.last()
}

Problem: Instrumenting with SkyWalking in a Minimalist Container

A major hurdle was correctly instrumenting this service with SkyWalking. In a traditional VM, you’d just add a -javaagent flag. In a Docker container for Knative, especially when striving for small image sizes, this requires more care. The solution involves a multi-stage Dockerfile to download the agent and then copy it into the final, lean image.

# Dockerfile for Kotlin service

# Stage 1: Build the fat JAR
FROM gradle:8.4-jdk17-alpine AS builder
WORKDIR /home/gradle/project
COPY build.gradle.kts settings.gradle.kts gradlew ./
COPY src ./src
# The --no-daemon flag is crucial for clean builds in CI environments
RUN ./gradlew shadowJar --no-daemon

# Stage 2: Download SkyWalking Agent
FROM alpine:3.18 AS skywalking_agent
ARG SKYWALKING_AGENT_VERSION=8.15.0
WORKDIR /agent
RUN apk add --no-cache curl tar && \
    curl -L -o agent.tar.gz https://archive.apache.org/dist/skywalking/java-agent/${SKYWALKING_AGENT_VERSION}/apache-skywalking-java-agent-${SKYWALKING_AGENT_VERSION}.tgz && \
    tar -xzf agent.tar.gz && \
    mv skywalking-agent /opt/skywalking-agent && \
    rm agent.tar.gz

# Stage 3: Create the final image
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
COPY --from=builder /home/gradle/project/build/libs/synthetic-runner.jar .
COPY --from=skywalking_agent /opt/skywalking-agent /opt/skywalking-agent

# Environment variables are configured in the Knative service manifest
# SW_AGENT_NAME: Service name in SkyWalking UI
# SW_AGENT_COLLECTOR_BACKEND_SERVICES: Address of the SkyWalking OAP
# JAVA_TOOL_OPTIONS: This is how we activate the agent
ENV JAVA_TOOL_OPTIONS="-javaagent:/opt/skywalking-agent/skywalking-agent.jar"

EXPOSE 8080
CMD ["java", "-jar", "synthetic-runner.jar"]

The key is the ENV JAVA_TOOL_OPTIONS. This standard environment variable is respected by the JVM to append options at startup, allowing us to inject the agent without hardcoding it in the CMD instruction.

Step 2: The Python/Matplotlib Visualization Service

This service is simpler. It’s a Flask application that accepts a JSON payload of timings, generates a plot, and returns it.

requirements.txt:

Flask==3.0.0
matplotlib==3.8.1
numpy==1.26.1

The application code:

# app.py

import io
import logging
import os
import numpy as np
import matplotlib
from flask import Flask, request, jsonify, Response

# A critical pitfall in headless environments is Matplotlib trying to use a GUI backend.
# 'Agg' is a non-interactive backend that renders to a file/buffer.
matplotlib.use('Agg')
import matplotlib.pyplot as plt

app = Flask(__name__)

# Configure basic logging
logging.basicConfig(level=logging.INFO)

@app.route('/generate-plot', methods=['POST'])
def generate_plot():
    if not request.json or 'timingData' not in request.json:
        logging.error("Invalid request: missing timingData field")
        return jsonify({"error": "Invalid payload"}), 400
    
    timings = request.json['timingData']
    trace_id = request.json.get('traceId', 'N/A')
    
    if not timings:
        logging.warning(f"Received empty timing data for traceId: {trace_id}")
        return jsonify({"error": "No timing data provided"}), 400

    logging.info(f"Generating plot for {len(timings)} data points. TraceId: {trace_id}")

    try:
        fig, ax = plt.subplots(figsize=(10, 6))
        
        p95 = np.percentile(timings, 95)
        p99 = np.percentile(timings, 99)
        avg = np.mean(timings)

        ax.hist(timings, bins=30, alpha=0.7, color='skyblue', edgecolor='black')
        ax.axvline(avg, color='green', linestyle='dashed', linewidth=2, label=f'Avg: {avg:.2f}ms')
        ax.axvline(p95, color='orange', linestyle='dashed', linewidth=2, label=f'p95: {p95:.2f}ms')
        ax.axvline(p99, color='red', linestyle='dashed', linewidth=2, label=f'p99: {p99:.2f}ms')
        
        ax.set_title(f'API Latency Distribution (Trace: {trace_id})')
        ax.set_xlabel('Latency (ms)')
        ax.set_ylabel('Frequency')
        ax.legend()
        ax.grid(True, which='both', linestyle='--', linewidth=0.5)
        
        # Save plot to a memory buffer instead of a file
        buf = io.BytesIO()
        plt.savefig(buf, format='png', bbox_inches='tight')
        plt.close(fig) # Essential to release memory
        buf.seek(0)
        
        return Response(buf.getvalue(), mimetype='image/png')

    except Exception as e:
        logging.exception(f"Failed to generate plot for traceId: {trace_id}")
        return jsonify({"error": str(e)}), 500

if __name__ == '__main__':
    port = int(os.environ.get("PORT", 8081))
    app.run(debug=False, host='0.0.0.0', port=port)

The Dockerfile for this service is straightforward.

# Dockerfile for Python service
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
ENV PORT=8080
EXPOSE 8080
CMD ["python", "app.py"]

Note that Knative expects the service to listen on port 8080 by default, which we configure via the PORT environment variable.

Step 3: Knative Deployment Manifests

With the container images defined, we need the Knative manifests to deploy them.

knative-services.yaml:

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: synthetic-runner
spec:
  template:
    metadata:
      annotations:
        # Lower the scale-down delay for cost savings.
        autoscaling.knative.dev/scale-down-delay: "60s"
    spec:
      containers:
        - image: your-registry/synthetic-runner:latest # This will be replaced by CircleCI
          ports:
            - containerPort: 8080
          env:
            - name: VISUALIZATION_SERVICE_URL
              value: "http://visualization-generator.default.svc.cluster.local/generate-plot"
            - name: TARGET_API_ENDPOINT
              value: "http://your-target-api.default.svc.cluster.local"
            # SkyWalking Configuration
            - name: SW_AGENT_NAME
              value: "synthetic-runner-kotlin"
            - name: SW_AGENT_COLLECTOR_BACKEND_SERVICES
              value: "skywalking-oap.skywalking.svc.cluster.local:11800"
---
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: visualization-generator
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/scale-down-delay: "60s"
    spec:
      containers:
        - image: your-registry/visualization-generator:latest
          ports:
            - containerPort: 8080
---
apiVersion: sources.knative.dev/v1
kind: CronJobSource
metadata:
  name: synthetic-test-trigger
spec:
  schedule: "*/5 * * * *" # Every 5 minutes
  data: '{"source": "CronJobSource"}'
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: synthetic-runner

This single file defines both services and the CronJobSource that triggers the entire workflow. A key detail is using Kubernetes internal DNS (service-name.namespace.svc.cluster.local) for service-to-service communication.

Step 4: The CircleCI Automation Pipeline

The final piece is the CI/CD pipeline to automate the build and deployment. In a real-world project, managing credentials securely is paramount. We use CircleCI contexts to store Kubernetes configuration.

.circleci/config.yml:

version: 2.1

orbs:
  docker: circleci/[email protected]
  kubernetes: circleci/[email protected]

jobs:
  build-and-push-kotlin:
    docker:
      - image: cimg/openjdk:17.0
    parameters:
      image_tag:
        type: string
        default: ${CIRCLE_SHA1}
    steps:
      - checkout
      - setup_remote_docker:
          version: 20.10.18
      - docker/check
      - docker/build:
          image: your-registry/synthetic-runner
          dockerfile: ./kotlin-service/Dockerfile
          tag: << parameters.image_tag >>
      - docker/push:
          image: your-registry/synthetic-runner
          tag: << parameters.image_tag >>

  build-and-push-python:
    docker:
      - image: cimg/python:3.11
    parameters:
      image_tag:
        type: string
        default: ${CIRCLE_SHA1}
    steps:
      - checkout
      - setup_remote_docker:
          version: 20.10.18
      - docker/check
      - docker/build:
          image: your-registry/visualization-generator
          dockerfile: ./python-service/Dockerfile
          tag: << parameters.image_tag >>
      - docker/push:
          image: your-registry/visualization-generator
          tag: << parameters.image_tag >>

  deploy-to-knative:
    docker:
      - image: cimg/base:2023.09
    parameters:
      image_tag:
        type: string
        default: ${CIRCLE_SHA1}
    steps:
      - checkout
      - kubernetes/install
      - run:
          name: "Deploy Services"
          # The use of 'sed' is a pragmatic way to update image tags in CI.
          # For more complex scenarios, Kustomize or Helm would be a better choice.
          command: |
            sed -i "s|image: your-registry/synthetic-runner:latest|image: your-registry/synthetic-runner:<< parameters.image_tag >>|g" knative-services.yaml
            sed -i "s|image: your-registry/visualization-generator:latest|image: your-registry/visualization-generator:<< parameters.image_tag >>|g" knative-services.yaml
            
            echo "Applying Knative manifests..."
            cat knative-services.yaml
            kubectl apply -f knative-services.yaml

workflows:
  build-and-deploy:
    jobs:
      - build-and-push-kotlin:
          context: my-docker-registry-creds
          filters:
            branches:
              only: main
      - build-and-push-python:
          context: my-docker-registry-creds
          filters:
            branches:
              only: main
      - deploy-to-knative:
          context: my-k8s-cluster-creds
          requires:
            - build-and-push-kotlin
            - build-and-push-python
          filters:
            branches:
              only: main

With this pipeline, any push to the main branch automatically builds both container images, tags them with the Git SHA, pushes them to the registry, and updates the Knative services. This closed the loop on our fourth requirement: a fully automated lifecycle.

The system is now operational. When a performance degradation occurs, we no longer stare at a flat line on a dashboard. Instead, we can pull up the trace in SkyWalking and see the exact latency breakdown of a synthetic run. We can also view the Matplotlib-generated histogram, which often reveals the problem instantly—a bimodal distribution might point to a failing node in a cache cluster, for example.

This solution, however, is not without its limitations. The current process for handling the generated image is to return it in an HTTP response, which is ephemeral. A robust implementation would require persisting these plots to an object store like S3, introducing another dependency and potential point of failure. Furthermore, while the Kotlin service is well-instrumented by SkyWalking’s Java agent, achieving the same level of automatic context propagation and detailed tracing in the Python service requires the integration of a Python-specific agent, a step that adds complexity to the build and runtime. The current trigger is a simple cron schedule; a more sophisticated approach would be to trigger these synthetic runs via a webhook immediately following a new deployment, effectively turning it into a powerful, automated performance smoke test.


  TOC