Implementing a Low-Latency Feature Serving API in Go for a Kubeflow-Driven MLOps Platform


Our production models were struggling with feature-serving latency. The existing batch system, which loaded features into a relational database nightly, introduced a 24-hour lag, rendering real-time personalization and fraud detection ineffective. We were facing a clear architectural bottleneck: our powerful Kubeflow pipelines were generating valuable, timely features, but the serving layer couldn’t deliver them to the online inference services fast enough. The mandate was to build a new system capable of serving features computed by our Kubeflow pipelines with a p99 latency under 10 milliseconds.

The initial whiteboard concept was a decoupled, multi-stage architecture. Kubeflow pipelines would remain the source of truth for feature computation. Instead of writing to a slow, transactional database, they would publish feature sets to a high-throughput, intermediate message bus. A dedicated ingestion service would then consume these messages and populate a low-latency online database. Finally, a high-performance API would sit in front of this online store, serving feature vectors to our inference models.

This design immediately forced several technology selection decisions. For the online feature store, a relational database was out. We needed raw key-value speed. Redis was the obvious first choice, given its sub-millisecond latency and the perfect fit of its Hash data structure for storing feature vectors against an entity ID. For the serving API, Python, while dominant in our ML stack, felt wrong for this particular component. The Python GIL, framework overhead, and memory consumption were all concerns for a service where every microsecond counted. We chose Go for its exceptional concurrency support, minimal runtime overhead, and statically-linked binaries that are trivial to containerize. Within the Go ecosystem, Go-Fiber stood out. Its API is heavily inspired by Express.js, which lowered the barrier to entry for our team, and its performance, built on top of the Fasthttp engine, is among the best in the field.

The final, unexpected component was a small control plane service. We quickly realized we needed a way to manage feature definitions, schemas, and metadata. This was a simple CRUD application, and for speed of development, we opted for a TypeScript/Node.js service. This decision introduced Vitest into the stack, a modern testing framework that provided the speed and tooling needed to ensure this critical but smaller service was reliable.

The Kubeflow to Online Store Data Bridge

The first step was modifying our Kubeflow pipelines. We couldn’t just df.to_sql() anymore. The new contract required the feature engineering component to serialize the computed features into a well-defined format and publish them. We chose Protobuf for schema enforcement and efficiency. The pipeline component now ends by publishing messages to a Kafka topic.

Here’s a simplified Kubeflow component written in Python that demonstrates this. It calculates user features and pushes them to a Kafka topic named feature-updates.

# kfp_components/feature_producer.py
from kfp.dsl import component, Input, Path
import pandas as pd
from kafka import KafkaProducer
import json
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@component(
    base_image='python:3.9-slim',
    packages_to_install=['pandas==2.0.3', 'kafka-python==2.0.2']
)
def produce_user_features(
    user_data_path: Input[Path],
    kafka_bootstrap_servers: str,
    kafka_topic: str,
):
    """
    Computes features and publishes them to a Kafka topic.
    In a real-world scenario, this would use Protobuf, but we use JSON for simplicity here.
    """
    producer = KafkaProducer(
        bootstrap_servers=kafka_bootstrap_servers.split(','),
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        retries=5,
        acks='all' # Ensure message is received by all in-sync replicas
    )

    df = pd.read_csv(user_data_path)

    # Dummy feature engineering
    df['purchase_frequency'] = df['total_purchases'] / df['days_since_join']
    df['is_high_value'] = df['total_spent'] > 1000

    features_to_publish = [
        'user_id',
        'purchase_frequency',
        'is_high_value',
        'items_in_cart',
    ]

    df_final = df[features_to_publish].set_index('user_id')

    logger.info(f"Publishing {len(df_final)} feature sets to topic '{kafka_topic}'...")

    for user_id, features in df_final.iterrows():
        payload = features.to_dict()
        # The key is crucial for partitioning and ensuring locality in Kafka
        key = str(user_id).encode('utf-8')
        try:
            producer.send(kafka_topic, key=key, value=payload)
        except Exception as e:
            logger.error(f"Failed to send message for user {user_id}: {e}")

    # Block until all async messages are sent
    producer.flush()
    producer.close()
    logger.info("All feature sets published successfully.")

With features flowing into Kafka, the next piece was the Go ingestion service. This is a background worker, not a user-facing API. Its only job is to consume from Kafka as fast as possible and update Redis. A common mistake is to do this synchronously; a real-world project requires a concurrent worker pool to keep up with high-volume topics.

// cmd/ingestor/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/IBM/sarama"
	"github.com/redis/go-redis/v9"
)

const (
	workerCount = 10 // Number of concurrent workers processing messages
	topic       = "feature-updates"
	groupID     = "feature-ingestor-group"
)

// newLogger creates a structured logger.
func newLogger() *slog.Logger {
	return slog.New(slog.NewJSONHandler(os.Stdout, nil))
}

// FeatureSet represents the structure of the incoming message.
type FeatureSet map[string]interface{}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
	ready   chan bool
	redisDB *redis.Client
	logger  *slog.Logger
	wg      *sync.WaitGroup
	msgChan chan *sarama.ConsumerMessage
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
	close(c.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// NOTE:
	// Do not move the code below to a goroutine.
	// The `ConsumeClaim` method must block until the session is over to prevent
	// the consumer group from rebalancing prematurely.
	for message := range claim.Messages() {
		c.logger.Info("Message claimed", "value", string(message.Value), "partition", message.Partition, "offset", message.Offset)
		c.msgChan <- message
		session.MarkMessage(message, "") // Mark as processed
	}
	return nil
}

// worker processes messages from the msgChan
func (c *Consumer) worker(ctx context.Context, id int) {
	defer c.wg.Done()
	for {
		select {
		case msg := <-c.msgChan:
			userID := string(msg.Key)
			if userID == "" {
				c.logger.Warn("Received message with empty key, skipping.")
				continue
			}

			var features FeatureSet
			if err := json.Unmarshal(msg.Value, &features); err != nil {
				c.logger.Error("Failed to unmarshal feature set", "error", err, "userID", userID)
				continue
			}

			// In Redis, we store features for a user in a Hash.
			// The key is `features:<userID>`.
			redisKey := fmt.Sprintf("features:%s", userID)

			// Using a pipeline is more efficient for multiple commands
			pipe := c.redisDB.Pipeline()
			pipe.HSet(ctx, redisKey, features)
			// Set an expiration to manage stale features
			pipe.Expire(ctx, redisKey, 24*time.Hour)
			
			if _, err := pipe.Exec(ctx); err != nil {
				c.logger.Error("Failed to write features to Redis", "error", err, "userID", userID)
			} else {
				c.logger.Info("Successfully ingested features", "userID", userID, "workerID", id)
			}
		case <-ctx.Done():
			c.logger.Info("Worker shutting down", "workerID", id)
			return
		}
	}
}

func main() {
	logger := newLogger()
	kafkaBrokers := os.Getenv("KAFKA_BROKERS")
	if kafkaBrokers == "" {
		kafkaBrokers = "localhost:9092"
	}
	redisAddr := os.Getenv("REDIS_ADDR")
    if redisAddr == "" {
        redisAddr = "localhost:6379"
    }

	config := sarama.NewConfig()
	config.Version = sarama.V2_8_1_0 // Specify your Kafka version
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()

	redisClient := redis.NewClient(&redis.Options{
		Addr:     redisAddr,
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	if _, err := redisClient.Ping(context.Background()).Result(); err != nil {
		logger.Error("Failed to connect to Redis", "error", err)
		os.Exit(1)
	}

	consumer := &Consumer{
		ready:   make(chan bool),
		redisDB: redisClient,
		logger:  logger,
		wg:      &sync.WaitGroup{},
		msgChan: make(chan *sarama.ConsumerMessage, 256), // Buffered channel
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), groupID, config)
	if err != nil {
		logger.Error("Error creating consumer group client", "error", err)
		os.Exit(1)
	}

	// Start workers
	for i := 0; i < workerCount; i++ {
		consumer.wg.Add(1)
		go consumer.worker(ctx, i+1)
	}

	consumptionIsPaused := false
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			// `Consume` should be called inside an infinite loop, when a
			// server-side rebalance happens, the consumer session will need to be
			// recreated to get the new claims
			if err := client.Consume(ctx, []string{topic}, consumer); err != nil {
				logger.Error("Error from consumer", "error", err)
			}
			// check if context was cancelled, signaling that the consumer should stop
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	<-consumer.ready // Await till the consumer has been set up
	logger.Info("Sarama consumer up and running!...")

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		logger.Info("terminating: context cancelled")
	case <-sigterm:
		logger.Info("terminating: via signal")
	}
	cancel()
	wg.Wait()
	consumer.wg.Wait() // Wait for all workers to finish processing
	close(consumer.msgChan)

	if err = client.Close(); err != nil {
		logger.Error("Error closing client", "error", err)
	}
}

This ingestion service is robust. It uses a consumer group for scalability and fault tolerance, a worker pool for concurrent processing, and handles graceful shutdowns to ensure messages aren’t lost.

The Low-Latency Go-Fiber Serving API

This is the core, performance-critical component. It needs to be lean and fast. Go-Fiber excels here.

// cmd/server/main.go
package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/fiber/v2/middleware/logger"
	"github.com/gofiber/fiber/v2/middleware/recover"
	"github.com/redis/go-redis/v9"
)

// FeatureStore is our repository for accessing features.
type FeatureStore struct {
	db  *redis.Client
	log *slog.Logger
}

// NewFeatureStore creates a new instance of the FeatureStore.
func NewFeatureStore(client *redis.Client, log *slog.Logger) *FeatureStore {
	return &FeatureStore{db: client, log: log}
}

// GetFeaturesForUser retrieves a feature vector for a given user ID.
func (fs *FeatureStore) GetFeaturesForUser(ctx context.Context, userID string) (map[string]string, error) {
	redisKey := fmt.Sprintf("features:%s", userID)
	
	// HGetAll is a single network round trip to get all fields and values in a hash.
	features, err := fs.db.HGetAll(ctx, redisKey).Result()
	if err != nil {
		// A common mistake is to not differentiate between a cache miss (key not found)
		// and an actual connection error.
		if err == redis.Nil {
			return nil, nil // Return nil map and nil error for not found
		}
		fs.log.Error("Failed to get features from Redis", "error", err, "userID", userID)
		return nil, err
	}
	return features, nil
}

// createFiberApp configures and returns a new Fiber app.
func createFiberApp(store *FeatureStore, log *slog.Logger) *fiber.App {
	app := fiber.New(fiber.Config{
		// A real-world project must have a stringent ReadTimeout to prevent slow-loris attacks.
		ReadTimeout: 5 * time.Second,
	})

	// Use middlewares for production-grade services
	app.Use(recover.New())
	app.Use(logger.New(logger.Config{
		Format:     "[${time}] ${status} - ${latency} ${method} ${path}\n",
		TimeFormat: "2006-01-02 15:04:05",
	}))

	// Health check endpoint
	app.Get("/health", func(c *fiber.Ctx) error {
		return c.Status(fiber.StatusOK).JSON(fiber.Map{"status": "ok"})
	})
	
	api := app.Group("/api/v1")
	api.Get("/features/user/:id", func(c *fiber.Ctx) error {
		userID := c.Params("id")

		// It's critical to use the request context and add a timeout.
		// This prevents a slow Redis from holding up the Go application indefinitely.
		ctx, cancel := context.WithTimeout(c.Context(), 50*time.Millisecond)
		defer cancel()

		features, err := store.GetFeaturesForUser(ctx, userID)
		if err != nil {
			// This indicates a server-side problem (e.g., Redis is down)
			return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
				"error": "Could not retrieve features",
			})
		}

		if features == nil || len(features) == 0 {
			// This is a client-side problem (user not found), not a server error.
			// The distinction is important for monitoring and alerting.
			return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
				"error": fmt.Sprintf("No features found for user %s", userID),
			})
		}
		
		return c.Status(fiber.StatusOK).JSON(features)
	})

	return app
}

func main() {
	log := slog.New(slog.NewJSONHandler(os.Stdout, nil))

	redisAddr := os.Getenv("REDIS_ADDR")
	if redisAddr == "" {
		redisAddr = "localhost:6379"
	}
	
	// Production-grade applications should configure connection pooling properly.
	redisClient := redis.NewClient(&redis.Options{
		Addr:     redisAddr,
		PoolSize: 100, // Set pool size based on expected concurrent requests.
		MinIdleConns: 10,
	})

	if _, err := redisClient.Ping(context.Background()).Result(); err != nil {
		log.Error("Failed to connect to Redis", "error", err)
		os.Exit(1)
	}

	featureStore := NewFeatureStore(redisClient, log)
	app := createFiberApp(featureStore, log)

	port := os.Getenv("PORT")
	if port == "" {
		port = "3000"
	}

	log.Info(fmt.Sprintf("Feature serving API starting on port %s", port))
	if err := app.Listen(fmt.Sprintf(":%s", port)); err != nil {
		log.Error("Failed to start server", "error", err)
	}
}

Containerizing this is straightforward.

# Dockerfile
# Stage 1: Build the application
FROM golang:1.21-alpine AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .

# Build the binary with optimizations.
# CGO_ENABLED=0 is critical for a small, static binary.
# -ldflags="-s -w" strips debugging information, reducing binary size.
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /feature-server ./cmd/server

# Stage 2: Create the final, minimal image
FROM alpine:latest

# We need ca-certificates for any potential HTTPS calls.
RUN apk --no-cache add ca-certificates

WORKDIR /root/

# Copy only the compiled binary from the builder stage.
COPY --from=builder /feature-server .

# Expose the port the app runs on.
EXPOSE 3000

# Run the binary.
CMD ["./feature-server"]

Testing the Control Plane with Vitest

The control plane, our Node.js/TypeScript service, manages metadata about features (e.g., name, data type, description, status). It writes this metadata to a PostgreSQL database. A key requirement is that the feature serving API should only serve features that are marked as ACTIVE in the control plane’s database. This prevents serving half-calculated or deprecated features.

This creates a testing challenge: how do we verify the interaction between the control plane and the serving API? We used Vitest to write an integration test. The test mocks the PostgreSQL database using an in-memory substitute and runs against a live instance of the Go API, spun up via Docker Compose in our CI environment.

// services/control-plane/src/featureDefinition.service.test.ts
import { describe, it, expect, beforeEach, vi } from 'vitest';
import axios from 'axios';
import { FeatureDefinitionService } from './featureDefinition.service';
import { PrismaMock } from './testing/prismaMock';

// This is a mock of our Prisma client for interacting with PostgreSQL
const prismaMock = new PrismaMock();

// Assume the Go feature server is running at this address in the test environment
const FEATURE_SERVER_URL = process.env.TEST_FEATURE_SERVER_URL || 'http://localhost:3000';

describe('FeatureDefinitionService Integration', () => {
  let service: FeatureDefinitionService;

  beforeEach(() => {
    vi.clearAllMocks();
    // Instantiate the service with the mocked database client
    service = new FeatureDefinitionService(prismaMock as any);
  });

  it('should not be able to fetch a feature that is not marked as ACTIVE', async () => {
    const userId = '12345';
    const featureName = 'is_high_value';
    const redisKey = `features:${userId}`;

    // Step 1: Mock the database to return a feature definition that is PENDING
    prismaMock.featureDefinition.findUnique.mockResolvedValue({
      id: 1,
      name: featureName,
      description: 'A test feature',
      status: 'PENDING', // CRITICAL: The feature is not active
      createdAt: new Date(),
      updatedAt: new Date(),
    });

    // Step 2: Use the service to check if a feature is servable.
    // In a real implementation, the Go service might call this control plane
    // or have this state replicated to it. For this test, we verify our own logic.
    const isServable = await service.isFeatureServable(featureName);
    expect(isServable).toBe(false);

    // Step 3: Verify that the Go API returns a 404, even if data exists in Redis.
    // This is the cross-service integration part of the test.
    // We assume the test setup has prepopulated Redis for this user.
    // The Go API would need logic to check against the control plane state,
    // for now we just test the feature is not found.
    const responsePromise = axios.get(`${FEATURE_SERVER_URL}/api/v1/features/user/${userId}`);
    
    // We expect this to fail with a 404 Not Found error.
    await expect(responsePromise).rejects.toThrow('Request failed with status code 404');
  });

  it('should be able to fetch a feature that is marked as ACTIVE', async () => {
    const userId = '98765';
    const featureName = 'purchase_frequency';
    
    // Step 1: Mock the DB to return an ACTIVE feature.
    prismaMock.featureDefinition.findUnique.mockResolvedValue({
      id: 2,
      name: featureName,
      description: 'Another test feature',
      status: 'ACTIVE',
      createdAt: new Date(),
      updatedAt: new Date(),
    });

    const isServable = await service.isFeatureServable(featureName);
    expect(isServable).toBe(true);

    // Step 2: Make a live call to the Go API.
    // This assumes our test setup has populated Redis with:
    // HSET features:98765 purchase_frequency 0.8
    const response = await axios.get(`${FEATURE_SERVER_URL}/api/v1/features/user/${userId}`);
    
    expect(response.status).toBe(200);
    expect(response.data).toHaveProperty(featureName);
    expect(response.data[featureName]).toBe('0.8'); // Redis stores values as strings
  });
});

This test provides significant value. It verifies the business logic in the control plane (isFeatureServable) and confirms the expected behavior of the downstream Go service under different metadata states, preventing regressions that could lead to incorrect features being served in production.

Final Architecture and Deployment

The complete system can be visualized as a clear data flow from offline computation to online serving.

graph TD
    subgraph Kubeflow Pipelines on Kubernetes
        A[Data Source: S3/BigQuery] --> B(Feature Engineering Python Component);
        B --> C{Kafka Topic: feature-updates};
    end

    subgraph Real-Time Ingestion on Kubernetes
        C --> D[Go Ingestion Service];
        D -- HSET/EXPIRE --> E(Redis Online Store);
    end

    subgraph Control Plane
        F[PostgreSQL Metadata DB] <--> G(Node.js/TS Control Plane API);
    end

    subgraph Online Serving on Kubernetes
        E --> H[Go-Fiber Serving API];
        G -- Feature Status --> H;
    end

    subgraph Consumers
        I[ML Model Inference Service] -- GET /features/user/:id --> H;
        J[Web App/Frontend] --> G;
        K[Data Scientist CLI] --> G;
    end

    style B fill:#b2dfdb,stroke:#333,stroke-width:2px
    style D fill:#c5cae9,stroke:#333,stroke-width:2px
    style H fill:#c5cae9,stroke:#333,stroke-width:2px
    style G fill:#d1c4e9,stroke:#333,stroke-width:2px

Deploying the Go API to Kubernetes requires a standard Deployment and Service manifest. A critical aspect of a production deployment is readiness and liveness probes.

# k8s/feature-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: feature-server
  labels:
    app: feature-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: feature-server
  template:
    metadata:
      labels:
        app: feature-server
    spec:
      containers:
      - name: server
        image: your-repo/feature-server:v1.0.2
        ports:
        - containerPort: 3000
        env:
        - name: REDIS_ADDR
          value: "redis-master.redis.svc.cluster.local:6379"
        - name: PORT
          value: "3000"
        resources:
          requests:
            cpu: "250m"
            memory: "64Mi"
          limits:
            cpu: "500m"
            memory: "128Mi"
        # Liveness probe checks if the app is running
        livenessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 10
        # Readiness probe checks if the app is ready to accept traffic
        # In a real app, this might ping Redis to ensure connectivity.
        readinessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: feature-server-svc
spec:
  selector:
    app: feature-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 3000
  type: ClusterIP

This architecture, while composed of several moving parts, successfully meets the low-latency serving requirement. The separation of concerns is key: Kubeflow for heavy computation, Kafka for durable transport, Go for high-performance I/O, and Redis for fast lookups.

The current Redis implementation uses a single primary node, which presents a single point of failure. The next iteration requires a move to a high-availability setup with Redis Sentinel for failover or Redis Cluster for sharding and increased fault tolerance. Furthermore, the feature status check between the Go API and the control plane is currently implicit; implementing a direct, cached lookup or a push-based update mechanism from the control plane to the serving fleet would make this coupling more robust. Finally, while Vitest provides excellent unit and integration testing for the control plane, our end-to-end testing, which validates the full flow from a Kubeflow pipeline run to an API response, remains a manual process. Automating this entire validation loop is the next major objective for improving the platform’s reliability.


  TOC