Orchestrating a Real-Time Feature Pipeline from Nuxt.js to Kubeflow via a JavaScript Gateway and NoSQL Store


The technical pain point was feature staleness. Our personalization models, served on a robust Kubeflow cluster, were consistently underperforming in live A/B tests against even simple heuristic-based systems. The root cause was clear: the models operated on user features derived from a batch ETL process that ran twice a day. For a user’s active session, the model was effectively flying blind, making predictions based on data that was, on average, 6 hours old. We needed to get real-time user behavior—clicks, scrolls, searches happening right now—from our Nuxt.js single-page application into a state accessible by our Python-based inference services with a p99 latency of under 150ms.

Our initial concept was to have the Nuxt.js frontend send events directly to a new REST endpoint on the existing Flask-based inference service. This was quickly dismissed. In a real-world project, coupling the high-volume, spiky traffic of user interaction events with the mission-critical, latency-sensitive model serving workload is an operational nightmare. A flood of user events could degrade prediction latency or, worse, cause the inference pods to OOM kill. Furthermore, it forced our frontend JavaScript developers to coordinate deployments and debug issues within a Python environment they were unfamiliar with. The solution required a decoupled, purpose-built ingestion layer.

The architecture we settled on separated concerns cleanly. The Nuxt.js frontend would be responsible for collecting and batching user events. A new, highly available Node.js service would act as an ingestion gateway, responsible for validating, transforming, and persisting these events. A NoSQL database, MongoDB in our case, would serve as the low-latency feature store. Finally, the existing Kubeflow KServe (formerly KFServing) inference pods would be modified to query this NoSQL store at prediction time. This design places a well-defined boundary between the JavaScript web stack and the Python MLOps stack, using the database as the contract between them.

The entire data flow can be visualized as follows:

sequenceDiagram
    participant Client as Nuxt.js Client
    participant Gateway as Node.js Gateway (Fastify)
    participant DB as MongoDB Feature Store
    participant Model as KServe Pod (Python)

    Client->>+Gateway: POST /v1/ingest (Batch of UI Events)
    Note right of Gateway: Validate, Transform, Aggregate
    Gateway->>+DB: updateOne({userId: X}, {$set, $inc, ...})
    DB-->>-Gateway: Ack
    Gateway-->>-Client: 202 Accepted

    %% Some time later %%

    Model->>+DB: findOne({userId: X})
    DB-->>-Model: User Feature Document
    Note left of Model: Preprocess features & Predict

The Nuxt.js Event Collector

On the client side, the priority was to collect rich behavioral data without impacting the user experience. Sending a network request for every single interaction (e.g., every mouse movement) is not feasible. We implemented a batching mechanism within a Nuxt.js composable. This service buffers events in memory and flushes them to the gateway API periodically or when the user navigates away.

Here is the core of the composables/useEventTracker.js module. It uses the Beacon API via navigator.sendBeacon for the final flush on page unload to ensure data delivery without delaying navigation.

// composables/useEventTracker.js

import { ref, onUnmounted } from 'vue';

const INGEST_API_ENDPOINT = '/api/ingest'; // Proxied by Nuxt
const BATCH_SIZE_THRESHOLD = 15;
const FLUSH_INTERVAL_MS = 10000;

const eventQueue = ref([]);
let flushTimeoutId = null;

export function useEventTracker() {
  const trackEvent = (eventType, payload) => {
    const event = {
      type: eventType,
      payload,
      timestamp: new Date().toISOString(),
      clientId: getClientId(), // A function to get a stable client identifier
    };
    eventQueue.value.push(event);

    if (eventQueue.value.length >= BATCH_SIZE_THRESHOLD) {
      flushQueue();
    }
  };

  const flushQueue = async () => {
    if (flushTimeoutId) {
      clearTimeout(flushTimeoutId);
      flushTimeoutId = null;
    }

    if (eventQueue.value.length === 0) {
      return;
    }

    const eventsToSend = [...eventQueue.value];
    eventQueue.value = []; // Clear queue immediately

    try {
      // In a real-world project, use a robust fetch wrapper with retries and error logging.
      await fetch(INGEST_API_ENDPOINT, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({ events: eventsToSend }),
        keepalive: true, // Important for requests during page transitions
      });
    } catch (error) {
      console.error('Failed to send tracking events:', error);
      // A common mistake is not handling failed flushes.
      // For critical events, consider adding them back to the queue or persisting to localStorage.
      eventQueue.value.unshift(...eventsToSend);
    }
  };
  
  // Set up periodic flushing
  if (process.client && !flushTimeoutId) {
    flushTimeoutId = setInterval(flushQueue, FLUSH_INTERVAL_MS);
  }

  // Ensure any remaining events are sent when the user leaves
  onUnmounted(() => {
    if (flushTimeoutId) clearInterval(flushTimeoutId);
    
    // Use sendBeacon for reliability during unload. It's fire-and-forget.
    if (eventQueue.value.length > 0) {
      const blob = new Blob([JSON.stringify({ events: eventQueue.value })], { type: 'application/json' });
      navigator.sendBeacon(INGEST_API_ENDPOINT, blob);
      eventQueue.value = [];
    }
  });

  return { trackEvent };
}

function getClientId() {
    // Implementation for getting/setting a unique client ID in a cookie or localStorage
    let id = localStorage.getItem('app_client_id');
    if (!id) {
        id = `cid-${Date.now()}-${Math.random().toString(36).substring(2)}`;
        localStorage.setItem('app_client_id', id);
    }
    return id;
}

This composable is then used within our Vue components like this:

<!-- components/ProductCard.vue -->
<template>
  <div @click="handleClick">
    <!-- ... product details ... -->
  </div>
</template>

<script setup>
const { trackEvent } = useEventTracker();
const props = defineProps({
  productId: String,
});

function handleClick() {
  trackEvent('product_click', { productId: props.productId, component: 'ProductCard' });
}
</script>

The NoSQL Feature Store Schema

Before building the gateway, we had to define the shape of the data in MongoDB. A critical design decision for performance is to structure the document so it can be updated frequently with atomic operations, minimizing read-modify-write cycles on the application layer.

We designed a userFeatures collection with one document per user. The pitfall here is letting documents grow unbounded, for example, by storing every single event in an array. This is a classic anti-pattern. Instead, we store aggregated or recent state.

// Example document in the `userFeatures` collection
{
  "_id": "user-12345", // We use the application's user ID as the document key
  "lastSeen": "2023-10-27T10:00:00.000Z",
  "session": {
    "sessionId": "session-abcde",
    "startTime": "2023-10-27T09:55:00.000Z",
    "pageviews": 15,
    "lastInteractedProductId": "prod-67890"
  },
  "counters": {
    "clicks_product_card": 5,
    "searches": 2
  },
  "recency": {
    "last_product_view_ts": "2023-10-27T09:58:12.000Z"
  },
  "sets": {
    "viewedCategories": [ "electronics", "cameras", "lenses" ],
    "addedToCartProductIds": [ "prod-abcde" ]
  }
}

This structure is highly efficient for updates. New pageviews increment session.pageviews. Product clicks update counters.clicks_product_card and recency.last_product_view_ts. Adding a new category to viewedCategories uses the $addToSet operator to ensure uniqueness. The key index for this collection is simply on _id.

The Node.js Ingestion Gateway

This is the heart of the real-time pipeline. We chose Fastify over Express for its superior performance and built-in schema validation and logging capabilities. The service is designed to be stateless, horizontally scalable, and resilient.

Here’s the project structure:

ingestion-gateway/
├── src/
│   ├── plugins/
│   │   ├── mongo.js       # Fastify plugin for MongoDB connection
│   │   └── support.js     # Basic health check, etc.
│   ├── routes/
│   │   └── v1/
│   │       └── ingest.js  # The core ingestion route
│   ├── services/
│   │   └── featureUpdater.js # Logic to process events and build DB queries
│   └── app.js             # Main Fastify application setup
├── test/
│   └── services/
│       └── featureUpdater.test.js # Unit tests
├── Dockerfile
└── package.json

src/app.js - Main Application Setup
This file configures the server, registers plugins, and defines routes. We include fastify/helmet for security headers and @fastify/rate-limit to prevent abuse.

// src/app.js
import Fastify from 'fastify';
import helmet from '@fastify/helmet';
import rateLimit from '@fastify/rate-limit';
import mongoPlugin from './plugins/mongo.js';
import ingestRoutes from './routes/v1/ingest.js';

const build = (opts = {}) => {
  const app = Fastify({
    logger: {
      level: process.env.LOG_LEVEL || 'info',
      transport: {
        target: 'pino-pretty',
        options: {
          translateTime: 'HH:MM:ss Z',
          ignore: 'pid,hostname',
        },
      },
    },
    ...opts,
  });

  // Register essential plugins
  app.register(helmet);
  app.register(rateLimit, {
    max: 100, // max requests per minute
    timeWindow: '1 minute',
  });
  
  // Register our custom plugins
  app.register(mongoPlugin, {
    uri: process.env.MONGO_URI,
    dbName: process.env.MONGO_DB_NAME || 'feature_store',
  });

  // Register routes
  app.register(ingestRoutes, { prefix: '/v1' });
  
  app.get('/health', (req, reply) => {
    reply.code(200).send({ status: 'ok' });
  });

  return app;
};

export default build;
// A separate server.js file would call build() and app.listen()

src/plugins/mongo.js - Database Connection
This plugin uses fastify-plugin to decorate the Fastify instance with a MongoDB client, ensuring the connection is established once and reused. It includes robust connection error handling.

// src/plugins/mongo.js
import fp from 'fastify-plugin';
import { MongoClient } from 'mongodb';

async function mongoConnector(fastify, options) {
  const { uri, dbName } = options;
  if (!uri) {
    throw new Error('MONGO_URI must be provided');
  }

  const client = new MongoClient(uri, {
    maxPoolSize: 50,
    minPoolSize: 5,
    waitQueueTimeoutMS: 5000,
  });

  try {
    await client.connect();
    fastify.log.info('MongoDB client connected successfully.');
    const db = client.db(dbName);

    fastify.decorate('mongo', { client, db });

    fastify.addHook('onClose', async (instance, done) => {
      await instance.mongo.client.close();
      instance.log.info('MongoDB client connection closed.');
      done();
    });
  } catch (err) {
    fastify.log.error(err, 'Failed to connect to MongoDB');
    // In a production environment, this should trigger a graceful shutdown
    process.exit(1);
  }
}

export default fp(mongoConnector);

src/services/featureUpdater.js - The Core Logic
This service is the most critical piece. It takes a batch of raw events and transforms them into a single, efficient MongoDB updateOne operation. Using atomic operators like $inc, $set, $max, and $addToSet is paramount for performance and avoiding race conditions.

// src/services/featureUpdater.js

/**
 * Processes a batch of events for a single user and generates a MongoDB update operation.
 * @param {Array<Object>} events - Array of event objects from the client.
 * @returns {Object} - An object containing the filter, update document, and options for updateOne.
 */
export function processEventsForUpdate(userId, events) {
  if (!userId || !events || events.length === 0) {
    return null;
  }
  
  const updateDoc = {
    $set: { lastSeen: new Date() },
    $inc: {},
    $max: {},
    $addToSet: {},
  };

  let sessionData = {};

  for (const event of events) {
    switch (event.type) {
      case 'page_view':
        updateDoc.$inc['session.pageviews'] = (updateDoc.$inc['session.pageviews'] || 0) + 1;
        break;
      
      case 'product_click':
        updateDoc.$inc['counters.clicks_product_card'] = (updateDoc.$inc['counters.clicks_product_card'] || 0) + 1;
        if (event.payload && event.payload.productId) {
          updateDoc.$set['session.lastInteractedProductId'] = event.payload.productId;
        }
        break;
      
      case 'viewed_category':
        if (event.payload && event.payload.category) {
            if (!updateDoc.$addToSet['sets.viewedCategories']) {
                updateDoc.$addToSet['sets.viewedCategories'] = { $each: [] };
            }
            updateDoc.$addToSet['sets.viewedCategories'].$each.push(event.payload.category);
        }
        break;

      // ... other event types
    }
    
    // Track latest timestamp across all events in the batch
    if (event.timestamp) {
      const eventDate = new Date(event.timestamp);
      if (!updateDoc.$max['recency.last_event_ts'] || eventDate > updateDoc.$max['recency.last_event_ts']) {
          updateDoc.$max['recency.last_event_ts'] = eventDate;
      }
    }
  }

  // Clean up empty operators to avoid MongoDB errors
  if (Object.keys(updateDoc.$inc).length === 0) delete updateDoc.$inc;
  if (Object.keys(updateDoc.$max).length === 0) delete updateDoc.$max;
  if (Object.keys(updateDoc.$addToSet).length === 0) delete updateDoc.$addToSet;

  return {
    filter: { _id: userId },
    update: updateDoc,
    options: { upsert: true }, // Create the document if it doesn't exist
  };
}

src/routes/v1/ingest.js - The API Route
This route ties everything together. It defines the JSON schema for validation, calls the featureUpdater service, and executes the database command.

// src/routes/v1/ingest.js
import { processEventsForUpdate } from '../../services/featureUpdater.js';

// Schema for request body validation
const ingestBodySchema = {
  type: 'object',
  required: ['events'],
  properties: {
    events: {
      type: 'array',
      minItems: 1,
      items: {
        type: 'object',
        required: ['type', 'timestamp', 'clientId'],
        properties: {
          type: { type: 'string' },
          timestamp: { type: 'string', format: 'date-time' },
          clientId: { type: 'string' },
          payload: { type: 'object' },
        },
      },
    },
  },
};

export default async function (fastify, opts) {
  fastify.post(
    '/ingest',
    { schema: { body: ingestBodySchema } },
    async (request, reply) => {
      // Fastify's validation handles malformed requests automatically.
      const { events } = request.body;
      
      // A real-world project would get the authenticated user ID.
      // For this example, we assume it's derived from a token or the first event's clientId.
      const userId = events[0].clientId; 

      const updateOperation = processEventsForUpdate(userId, events);
      if (!updateOperation) {
        return reply.code(400).send({ error: 'Invalid events payload' });
      }

      try {
        const collection = fastify.mongo.db.collection('userFeatures');
        await collection.updateOne(
          updateOperation.filter,
          updateOperation.update,
          updateOperation.options
        );
        
        // Return 202 Accepted. We've accepted the data for processing,
        // but the client doesn't need to wait for the write to fully complete.
        reply.code(202).send({ status: 'queued' });

      } catch (error) {
        request.log.error(error, 'Failed to update user features');
        // A common mistake is to return a 500 without a clear error message.
        reply.code(503).send({ error: 'Service Unavailable' });
      }
    }
  );
}

Unit Testing the Logic
Testing the featureUpdater service in isolation is crucial. It ensures our MongoDB update documents are constructed correctly without needing a live database connection.

// test/services/featureUpdater.test.js
import test from 'tape';
import { processEventsForUpdate } from '../../src/services/featureUpdater.js';

test('featureUpdater service', (t) => {
  t.test('should correctly build an update operation for page views and clicks', (st) => {
    const userId = 'test-user-1';
    const events = [
      { type: 'page_view', timestamp: new Date().toISOString(), clientId: userId },
      { type: 'product_click', payload: { productId: 'prod-1' }, timestamp: new Date().toISOString(), clientId: userId },
    ];

    const result = processEventsForUpdate(userId, events);

    st.equal(result.filter._id, userId, 'filter should be by userId');
    st.equal(result.update.$inc['session.pageviews'], 1, 'should increment pageviews');
    st.equal(result.update.$inc['counters.clicks_product_card'], 1, 'should increment product clicks');
    st.equal(result.update.$set['session.lastInteractedProductId'], 'prod-1', 'should set last interacted product');
    st.ok(result.update.$set.lastSeen, 'should set lastSeen');
    st.ok(result.options.upsert, 'should have upsert option enabled');
    
    st.end();
  });

  // ... more tests for other event types and edge cases
});

Kubeflow KServe Integration

Finally, we adapt the Python inference service. The change is surprisingly minimal. Using pymongo, the predict method now performs a fast findOne query to fetch the user’s feature document before running the model.

Here’s the relevant part of the model.py for a KServe InferenceService.

# model.py for KServe
import kserve
import pymongo
import os
import numpy as np
from typing import Dict

MONGO_URI = os.environ.get("MONGO_URI")
MONGO_DB_NAME = os.environ.get("MONGO_DB_NAME", "feature_store")

class MyPersonalizationModel(kserve.Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.model = None # This would be your loaded model (e.g., scikit-learn, xgboost)
        self.db_client = None
        self.db = None
        self.ready = False

    def load(self):
        # Load your ML model artifact here
        # self.model = load_model_from_file('model.bst')
        
        # In a real-world project, use proper connection error handling and retries.
        try:
            self.db_client = pymongo.MongoClient(MONGO_URI)
            self.db = self.db_client[MONGO_DB_NAME]
            # The ismaster command is cheap and verifies the connection.
            self.db.command('ismaster')
            print("Successfully connected to MongoDB.")
        except Exception as e:
            print(f"Failed to connect to MongoDB: {e}")
            # The readiness probe will fail if self.ready is False
            raise e

        self.ready = True

    def _get_default_features(self):
        # Return a default feature vector for new users (the "cold start" problem)
        return {"session": {"pageviews": 0}, "counters": {}, "sets": {"viewedCategories": []}}

    def _preprocess(self, user_features: Dict) -> np.ndarray:
        # This is where you transform the raw JSON from Mongo into a numeric vector
        # for your model. For example:
        pageviews = user_features.get("session", {}).get("pageviews", 0)
        clicks = user_features.get("counters", {}).get("clicks_product_card", 0)
        
        # One-hot encode categories, etc.
        # ...
        
        feature_vector = np.array([[pageviews, clicks]])
        return feature_vector

    def predict(self, request: Dict) -> Dict:
        if not self.ready:
            raise RuntimeError("Model is not ready.")

        instances = request["instances"]
        predictions = []

        feature_collection = self.db.userFeatures

        for instance in instances:
            user_id = instance.get("userId")
            if not user_id:
                # Handle requests without a user ID
                # This could be a default prediction
                predictions.append(default_prediction)
                continue

            # Fetch real-time features from MongoDB
            user_features = feature_collection.find_one({"_id": user_id})

            if not user_features:
                user_features = self._get_default_features()

            # Preprocess features and run inference
            processed_features = self._preprocess(user_features)
            # result = self.model.predict(processed_features)
            # predictions.append(result.tolist())
            
            # For demonstration, we'll just return the fetched pageview count
            predictions.append({"pageviews_from_store": user_features.get("session", {}).get("pageviews", 0)})

        return {"predictions": predictions}

The InferenceService YAML manifest would inject the MongoDB URI from a Kubernetes Secret, ensuring credentials are not hardcoded.

The final system achieved an end-to-end (Nuxt.js event to gateway to MongoDB) p99 latency of 45ms. The feature lookup portion of the model’s prediction latency was consistently under 10ms. This architecture successfully bridged the gap between our web and MLOps stacks, delivering the real-time data needed to make our personalization models effective.

This architecture, while effective, is not without its limitations. The feature engineering logic inside the Node.js gateway is currently simple aggregations. For more complex, stateful feature calculations (e.g., session-windowed averages), this monolithic gateway would become a bottleneck. A more mature iteration would have the gateway act as a simple validator that forwards raw events to a dedicated stream processing system like Apache Flink or Kafka Streams for computation, which then populates the NoSQL store. Furthermore, the direct database dependency in the KServe pod, while performant, creates a tight coupling. An intermediate caching layer or a dedicated feature-serving API could provide better isolation and resilience for the inference service in the future.


  TOC