Implementing a Real-Time Feature Pipeline with FastAPI Cassandra and PyTorch for Mobile Personalization


The technical pain point was latency. Our initial personalization engine ran server-side. A user action in the Flutter application triggered a request, the backend gathered features from a relational database, ran a PyTorch model, and returned a result. Round-trip times were consistently over 400ms, a noticeable drag on the user experience. The obvious solution was to move inference to the device using PyTorch Mobile. This solved the model execution latency but introduced a new, more complex problem: feature freshness. An on-device model is only as good as the features it’s fed. Stale features, packaged with the app or updated infrequently, lead to poor personalization. We needed a system that could update user feature vectors in near real-time and make them available to the mobile client with extremely low latency. This is the log of how we built that data backbone.

Initial Concept and Technology Selection

The architecture had to be write-heavy and read-fast. User interactions generate a constant stream of events—clicks, views, searches—that must be processed into features. The mobile app, on startup or key screen transitions, needs to pull the latest feature vector for a specific user. This read operation had a strict budget of under 50ms.

This immediately ruled out our existing PostgreSQL infrastructure. While excellent for transactional workloads, its performance for this kind of high-volume, user-centric key-value lookup, especially under heavy write load, was not a good fit. We were designing a data pipeline, not a normalized business database.

Our proposed architecture looked like this:

graph TD
    subgraph Mobile Client
        A[Flutter App] -- 1. User Action --> B{Event Generation};
        B --> C[PyTorch Mobile Model];
        A -- 4. Fetch Latest Features (HTTP GET) --> G;
        G -- 5. Low-Latency Feature Vector --> A;
    end

    subgraph Backend Infrastructure
        D[Event Stream Processor] -- 2. Process & Compute Features --> F;
        F{Feature Ingestion API} -- 3. Write Features (HTTP POST) --> H[(Cassandra)];
        G[Feature Retrieval API] -- Query by user_id --> H;
    end

    subgraph Data Sources
        E[Kafka/Pulsar] --> D;
    end

    style H fill:#f9f,stroke:#333,stroke-width:2px

Technology Selection Rationale:

  1. Database: Apache Cassandra. The choice was driven by the access pattern. We needed to query features for a single user_id very quickly. Cassandra’s architecture, based on partition keys, is explicitly designed for this. Its masterless architecture provides the high availability and linear write scalability required to handle millions of feature updates without breaking a sweat. In a real-world project, you don’t fight your database’s design; you lean into it. Cassandra’s design was a perfect match.

  2. API Framework: FastAPI. We needed a Python backend for seamless integration with the data science ecosystem and ML feature generation logic. The choice was between Flask, Django, and FastAPI. FastAPI won for two critical reasons: its native asyncio support is ideal for I/O-bound tasks like database queries, and its Pydantic integration provides out-of-the-box data validation and serialization, which drastically reduces boilerplate and runtime errors. For a high-performance microservice, it’s the pragmatic choice.

  3. Client: Flutter with PyTorch Mobile. Flutter gave us a single codebase for iOS and Android. PyTorch Mobile (via a platform channel or a Dart FFI wrapper) allowed us to execute the model directly on the user’s device, eliminating network latency for the inference step itself. This decision is what created the stringent requirement for the feature retrieval API.

Cassandra Data Modeling: The Foundation

A common mistake when moving from SQL to NoSQL is trying to replicate relational designs. With Cassandra, you design your tables around your queries, not your data’s relationships. Our primary query was: “For a given user_id, get the most recent feature vector.”

This led to the following data model.

-- cqlsh command to create the keyspace and table
-- In production, replication factor should be 3 or more in a multi-node cluster.
CREATE KEYSPACE IF NOT EXISTS feature_store
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

USE feature_store;

-- This table is optimized for our single query pattern.
-- We retrieve all columns for a single user_id.
CREATE TABLE IF NOT EXISTS user_features (
    user_id UUID,
    last_updated TIMESTAMP,
    -- A map is flexible for storing features of varying names and types.
    -- This avoids schema changes every time a new feature is added.
    features_float MAP<TEXT, FLOAT>,
    features_string MAP<TEXT, TEXT>,
    -- Time-to-live (TTL) can be set on writes to automatically expire old data.
    PRIMARY KEY (user_id)
);

Key Design Decisions:

  • Partition Key: user_id is the partition key. This means all data for a single user is co-located on a single node (and its replicas), making the read operation a single, fast disk seek. This is the most critical aspect of the design.
  • Flexible Schema with Maps: We used MAP types to store features. This is a deliberate trade-off. It provides immense flexibility—data scientists can add or remove features without requiring a database migration. The cost is a slight serialization/deserialization overhead and the loss of server-side type checking for individual features. For our use case, flexibility was paramount.
  • No Clustering Columns: We initially considered using a last_updated timestamp as a clustering column to store a history of feature vectors. However, our requirement was only for the latest vector. Storing history would complicate the read logic (SELECT ... LIMIT 1) and bloat storage. The simpler design is to just overwrite the row for a given user_id with the latest data. This keeps reads extremely simple and fast.

The FastAPI Service: The Core Engine

The service has two responsibilities: ingesting feature updates and serving feature vectors. The implementation needed to be robust, maintainable, and production-ready.

Project Structure

A clean structure is not optional; it’s a requirement for maintainability.

feature-service/
├── app/
│   ├── __init__.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── endpoints/
│   │   │   ├── __init__.py
│   │   │   └── features.py   # API routes
│   │   └── models.py         # Pydantic models
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py         # Configuration management
│   │   └── logging_config.py # Logging setup
│   ├── db/
│   │   ├── __init__.py
│   │   └── cassandra.py      # Cassandra connection management
│   ├── repository/
│   │   ├── __init__.py
│   │   └── feature_repo.py   # Data access logic
│   └── main.py               # FastAPI app instance
├── tests/
│   └── ...                   # Unit and integration tests
└── requirements.txt

Cassandra Connection Management

You do not connect to a distributed database on every request. Connection setup is expensive. We needed a singleton pattern to manage the Cluster and Session objects from the cassandra-driver.

app/db/cassandra.py

import logging
from cassandra.cluster import Cluster, Session
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy, ConstantReconnectionPolicy
from cassandra.query import dict_factory

from app.core.config import settings

logger = logging.getLogger(__name__)

class CassandraManager:
    """
    Manages the lifecycle of the Cassandra connection and session.
    This is designed to be a singleton managed by FastAPI's dependency injection.
    """
    cluster: Cluster | None = None
    session: Session | None = None

    def connect(self):
        """
        Establishes the connection to the Cassandra cluster.
        In a real-world project, configuration values must come from a secure source.
        """
        if self.cluster:
            return

        logger.info("Initializing Cassandra connection...")
        try:
            # Production-grade settings:
            # - Use DCAwareRoundRobinPolicy for multi-DC deployments.
            # - TokenAwarePolicy is crucial for routing queries to the node that owns the data.
            # - A sane reconnection policy prevents the app from giving up after a transient network issue.
            auth_provider = PlainTextAuthProvider(
                username=settings.CASSANDRA_USER,
                password=settings.CASSANDRA_PASSWORD
            )
            self.cluster = Cluster(
                contact_points=settings.CASSANDRA_CONTACT_POINTS,
                port=settings.CASSANDRA_PORT,
                auth_provider=auth_provider,
                load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc=settings.CASSANDRA_LOCAL_DC)),
                reconnection_policy=ConstantReconnectionPolicy(delay=5, max_attempts=12) # Try to reconnect for 1 minute
            )
            self.session = self.cluster.connect(settings.CASSANDRA_KEYSPACE)
            # Set the row factory to dict_factory to get rows as dictionaries
            # which is easier to work with than tuples.
            self.session.row_factory = dict_factory
            logger.info("Cassandra connection established successfully.")
        except Exception as e:
            logger.critical(f"Failed to connect to Cassandra: {e}", exc_info=True)
            # This is a fatal error. The application cannot run without the database.
            raise

    def close(self):
        """
        Gracefully shuts down the cluster connection.
        """
        if self.cluster:
            logger.info("Closing Cassandra connection.")
            self.cluster.shutdown()
            self.cluster = None
            self.session = None

    def get_session(self) -> Session:
        """
        Provides the active session. Ensures connection is alive.
        """
        if not self.session:
            logger.error("Cassandra session not available. Attempting to reconnect.")
            self.connect()
        # The cast is safe because connect() raises an exception on failure
        return self.session

# Singleton instance
db_manager = CassandraManager()

# Dependency for FastAPI
def get_db_session() -> Session:
    return db_manager.get_session()

This manager is hooked into the FastAPI application lifecycle using startup and shutdown events in app/main.py.

Pydantic Models and Repository Layer

Pydantic models enforce a strict data contract for our API. The repository pattern isolates the data access logic, making the API endpoints cleaner and our code more testable.

app/api/models.py

from pydantic import BaseModel, Field
from datetime import datetime
from uuid import UUID

class FeatureIngestionRequest(BaseModel):
    user_id: UUID
    features_float: dict[str, float] = Field(default_factory=dict)
    features_string: dict[str, str] = Field(default_factory=dict)

class FeatureVectorResponse(BaseModel):
    user_id: UUID
    last_updated: datetime
    features_float: dict[str, float]
    features_string: dict[str, str]

    class Config:
        orm_mode = True # Enables creating the model from ORM objects (or dicts)

app/repository/feature_repo.py

import logging
from uuid import UUID
from datetime importdatetime
from cassandra.cluster import Session
from cassandra.query import SimpleStatement, PreparedStatement
from cassandra.protocol import register_udt

from app.api.models import FeatureIngestionRequest, FeatureVectorResponse

logger = logging.getLogger(__name__)

class FeatureRepository:
    """
    Handles all database operations for user features.
    """
    def __init__(self, session: Session):
        self.session = session
        self._prepare_statements()

    def _prepare_statements(self):
        """
        Prepare CQL statements once to improve performance.
        Cassandra parses the query once and reuses the execution plan.
        This is a critical optimization.
        """
        self.insert_stmt: PreparedStatement = self.session.prepare(
            """
            INSERT INTO user_features (user_id, last_updated, features_float, features_string)
            VALUES (?, ?, ?, ?)
            """
        )
        self.select_stmt: PreparedStatement = self.session.prepare(
            """
            SELECT user_id, last_updated, features_float, features_string
            FROM user_features
            WHERE user_id = ?
            LIMIT 1
            """
        )

    def upsert_features(self, feature_data: FeatureIngestionRequest) -> None:
        """
        Inserts or updates a user's feature vector.
        """
        try:
            current_timestamp = datetime.utcnow()
            bound_stmt = self.insert_stmt.bind([
                feature_data.user_id,
                current_timestamp,
                feature_data.features_float,
                feature_data.features_string
            ])
            self.session.execute(bound_stmt)
            logger.debug(f"Upserted features for user_id: {feature_data.user_id}")
        except Exception as e:
            logger.error(f"Error upserting features for user {feature_data.user_id}: {e}", exc_info=True)
            # In a real system, you might push this to a dead-letter queue for retry.
            raise

    def get_features_by_user_id(self, user_id: UUID) -> FeatureVectorResponse | None:
        """
        Retrieves the latest feature vector for a given user_id.
        """
        try:
            bound_stmt = self.select_stmt.bind([user_id])
            row = self.session.execute(bound_stmt).one()
            if row:
                logger.debug(f"Features found for user_id: {user_id}")
                return FeatureVectorResponse.from_orm(row)
            logger.warning(f"No features found for user_id: {user_id}")
            return None
        except Exception as e:
            logger.error(f"Error retrieving features for user {user_id}: {e}", exc_info=True)
            raise

The use of prepared statements is not negotiable in production. It protects against CQL injection and significantly reduces CPU load on Cassandra nodes by avoiding repeated query parsing.

API Endpoints with Dependency Injection

FastAPI’s Depends system wires everything together beautifully.

app/api/endpoints/features.py

import logging
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from cassandra.cluster import Session

from app.api.models import FeatureIngestionRequest, FeatureVectorResponse
from app.db.cassandra import get_db_session
from app.repository.feature_repo import FeatureRepository

logger = logging.getLogger(__name__)
router = APIRouter()

def get_feature_repo(session: Session = Depends(get_db_session)) -> FeatureRepository:
    """Dependency to create a repository instance per request."""
    return FeatureRepository(session)

@router.post(
    "/features",
    status_code=status.HTTP_202_ACCEPTED,
    summary="Ingest user feature vector"
)
async def ingest_features(
    request_body: FeatureIngestionRequest,
    repo: FeatureRepository = Depends(get_feature_repo)
):
    """
    Asynchronous endpoint to write feature data.
    In a high-throughput scenario, this endpoint should be lightweight,
    validating the input and then possibly handing off to a background task or queue.
    """
    try:
        # FastAPI runs this in a thread pool, so synchronous DB calls don't block the event loop.
        repo.upsert_features(request_body)
        return {"message": "Feature ingestion acknowledged."}
    except Exception as e:
        logger.exception(f"Ingestion failed for user {request_body.user_id}")
        # A specific error response is better than a generic 500.
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Failed to process feature ingestion."
        )

@router.get(
    "/features/{user_id}",
    response_model=FeatureVectorResponse,
    summary="Retrieve latest feature vector for a user"
)
async def get_features(
    user_id: UUID,
    repo: FeatureRepository = Depends(get_feature_repo)
) -> FeatureVectorResponse:
    """
    Retrieves the latest feature vector for a specific user ID.
    This is the critical path for the mobile app, so it must be fast.
    """
    try:
        feature_vector = repo.get_features_by_user_id(user_id)
        if not feature_vector:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Features not found for user {user_id}"
            )
        return feature_vector
    except HTTPException:
        raise # Re-raise HTTPException to preserve status code and detail
    except Exception:
        logger.exception(f"Feature retrieval failed for user {user_id}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Failed to retrieve features."
        )

This code is clean, testable, and robust. The dependency injection separates the web layer from the data layer, and the error handling provides meaningful feedback without exposing internal implementation details.

The Flutter Client: Consuming the Features

The client-side implementation is more straightforward but requires care in handling network state and errors.

Dart Service for API communication

// lib/services/feature_service.dart
import 'dart:convert';
import 'dart:io';

import 'package:http/http.dart' as http;

// Data model to match the backend response
class FeatureVector {
  final String userId;
  final DateTime lastUpdated;
  final Map<String, double> featuresFloat;
  final Map<String, String> featuresString;

  FeatureVector({
    required this.userId,
    required this.lastUpdated,
    required this.featuresFloat,
    required this.featuresString,
  });

  factory FeatureVector.fromJson(Map<String, dynamic> json) {
    return FeatureVector(
      userId: json['user_id'],
      lastUpdated: DateTime.parse(json['last_updated']),
      featuresFloat: (json['features_float'] as Map<String, dynamic>)
          .map((key, value) => MapEntry(key, (value as num).toDouble())),
      featuresString: (json['features_string'] as Map<String, dynamic>)
          .map((key, value) => MapEntry(key, value as String)),
    );
  }
}

class ApiServiceException implements Exception {
  final String message;
  final int? statusCode;
  ApiServiceException(this.message, {this.statusCode});

  
  String toString() => 'ApiServiceException: $message (Status Code: $statusCode)';
}


class FeatureService {
  // In a real app, this would come from a configuration file.
  final String _baseUrl = "http://10.0.2.2:8000"; // Android emulator localhost

  Future<FeatureVector> getFeatures(String userId) async {
    final uri = Uri.parse('$_baseUrl/features/$userId');
    
    try {
      final response = await http.get(
        uri,
        headers: {'Accept': 'application/json'},
      ).timeout(const Duration(seconds: 3)); // A strict timeout is crucial for mobile UX.

      if (response.statusCode == 200) {
        final Map<String, dynamic> data = json.decode(response.body);
        return FeatureVector.fromJson(data);
      } else if (response.statusCode == 404) {
        throw ApiServiceException('User features not found', statusCode: 404);
      } else {
        // Handle other server-side errors
        throw ApiServiceException(
            'Failed to load features: Server error', 
            statusCode: response.statusCode,
        );
      }
    } on SocketException {
      // Handle network errors (no internet, etc.)
      throw ApiServiceException('Network error: Please check your connection.');
    } on TimeoutException {
      throw ApiServiceException('Request timed out. Please try again.');
    } catch (e) {
      // Re-throw any unhandled or specific exceptions.
      rethrow;
    }
  }
}

This Dart code provides a clean, typed interface to the backend. It includes essential production considerations like timeouts and distinguishes between different failure modes (network error, not found, server error), which allows the UI layer to react appropriately—for example, by showing a specific error message or falling back to default behavior. This FeatureVector object would then be passed to the on-device PyTorch model to generate a personalized experience.

Lingering Issues and Future Iterations

This architecture successfully met our goal of providing fresh features to an on-device model with low latency. However, it is not without its limitations and potential for improvement. The current feature ingestion endpoint is synchronous and relies on a direct HTTP POST. In a system with a very high volume of events, this endpoint would become a bottleneck. The correct approach would be to have the Event Stream Processor write directly to a message queue like Kafka, with a fleet of consumers asynchronously processing batches of events and writing them to Cassandra. This decouples the systems and improves resilience.

Furthermore, the FastAPI application itself is a single point of failure. It must be containerized and deployed behind a load balancer with multiple replicas to ensure high availability, matching the resilience of the Cassandra cluster it depends on. We also omitted authentication and authorization on the API endpoints, which would be a mandatory addition, likely using OAuth2 JWT Bearer tokens. Finally, for users in different geographic regions, deploying read replicas of the API service and using a multi-datacenter Cassandra cluster with LOCAL_QUORUM read consistency would be necessary to maintain low latency globally.


  TOC