Implementing a Replayable Event-Sourced Feature Store with Ray Compute Clusters on Nomad


The request from the machine learning team was deceptive in its simplicity: “We need to backfill features for our new fraud detection model using data from the last six months.” This triggered a cascade of failures in our existing architecture. Our production database only held the current state of user accounts, and our nightly ETL jobs, which populated a data warehouse, were destructive, overwriting previous values. Reconstructing the precise state of a user account at any given point in time was impossible. Every new feature idea or model retraining effort was blocked by this fundamental limitation. The engineering cost of manually backfilling data was spiraling, and the iteration speed for our data scientists had ground to a halt.

Our initial concept was a radical departure from traditional state management. Instead of storing the current state, we would persist a chronologically ordered, immutable sequence of domain events. An account’s balance isn’t a mutable field in a database table; it’s the result of folding a series of DepositMade and WithdrawalExecuted events. This is the core of Event Sourcing. This pattern would provide a perfect, replayable audit log of everything that has ever happened in the system, making point-in-time state reconstruction a first-class capability. The challenge, however, shifted from data storage to data processing. Replaying millions of events for thousands of users on-demand to generate feature vectors required a compute framework that was both massively parallel and lightweight enough for interactive queries.

This led to our technology selection. For the event log, we needed a durable, high-throughput message broker. Apache Kafka was a solid choice. For the compute layer, standard batch processing tools like Spark felt too heavyweight; their startup latency and resource overhead were ill-suited for the on-demand, single-entity feature generation we envisioned. Ray, with its lightweight actor model and task parallelism, offered the perfect fit. It allowed us to conceptualize feature generation as a set of distributed actors, each responsible for managing the state of a single entity by processing its event stream. Finally, for orchestration, we needed a tool that could manage this heterogeneous workload: a stateful Kafka cluster, stateless Python API services, and the dynamic, ephemeral Ray compute clusters. While Kubernetes is the dominant player, its operational complexity was a significant concern for our small platform team. HashiCorp Nomad, with its architectural simplicity, first-class support for non-containerized jobs, and seamless integration with Consul for service discovery, presented a more pragmatic path forward. This post-mortem details the build-out of this replayable feature store, from the event model to the orchestration of Ray actors on a Nomad cluster.

Defining the Immutable Log: Events and the Writer Service

The foundation of the system is the event itself. In a real-world project, defining these events requires careful domain modeling, but for this implementation, we’ll focus on a simplified user activity domain. Each event is a simple, immutable data structure carrying all necessary information about a state change.

# src/events.py
import uuid
from datetime import datetime, timezone
from pydantic import BaseModel, Field

def current_time_utc() -> datetime:
    return datetime.now(timezone.utc)

def generate_uuid() -> str:
    return str(uuid.uuid4())

class EventMetadata(BaseModel):
    event_id: str = Field(default_factory=generate_uuid)
    event_type: str
    created_at: datetime = Field(default_factory=current_time_utc)
    version: int = 1

class UserRegistered(BaseModel):
    meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="UserRegistered"))
    user_id: str
    email: str
    registered_at: datetime

class LoginSucceeded(BaseModel):
    meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="LoginSucceeded"))
    user_id: str
    ip_address: str
    timestamp: datetime

class PasswordChanged(BaseModel):
    meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="PasswordChanged"))
    user_id: str
    timestamp: datetime

class TransactionCreated(BaseModel):
    meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="TransactionCreated"))
    user_id: str
    transaction_id: str
    amount_usd: float
    country_code: str
    timestamp: datetime

The next component is a service responsible for ingesting these events and writing them to our durable log, Kafka. A critical requirement here is that all events for a given entity (e.g., a user_id) must land in the same Kafka partition. This guarantees ordering and simplifies the consumption logic for our downstream Ray actors, as a single consumer will see all events for a user in the exact order they occurred.

Here’s the core of the event writer, a simple FastAPI application.

# src/event_writer_service.py
import os
import json
import logging
from typing import Union
from fastapi import FastAPI, HTTPException, Request
from aiokafka import AIOKafkaProducer
from contextlib import asynccontextmanager

from .events import UserRegistered, LoginSucceeded, PasswordChanged, TransactionCreated

# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "user-events")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()

# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Kafka Producer Lifecycle Management ---
producer = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global producer
    logger.info(f"Connecting to Kafka at {KAFKA_BOOTSTRAP_SERVERS}...")
    try:
        producer = AIOKafkaProducer(
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8')
        )
        await producer.start()
        logger.info("Kafka producer started successfully.")
        yield
    finally:
        if producer:
            logger.info("Stopping Kafka producer...")
            await producer.stop()
            logger.info("Kafka producer stopped.")

app = FastAPI(lifespan=lifespan)

# --- Type Hint for all possible events ---
EventType = Union[UserRegistered, LoginSucceeded, PasswordChanged, TransactionCreated]

@app.post("/publish_event")
async def publish_event(event: EventType):
    """
    Receives an event, validates its structure, and publishes it to Kafka.
    The `user_id` is used as the partition key to ensure ordering.
    """
    if not hasattr(event, 'user_id'):
        raise HTTPException(status_code=400, detail="Event must contain a 'user_id' field.")

    event_dict = event.model_dump()
    user_id = event.user_id
    event_type = event.meta.event_type
    
    logger.info(f"Publishing event '{event_type}' for user_id '{user_id}'")

    try:
        # The key=user_id is critical. Kafka's default partitioner will use this
        # to consistently hash the key to the same partition.
        await producer.send_and_wait(
            topic=KAFKA_TOPIC,
            value=event_dict,
            key=user_id
        )
        return {"status": "success", "event_id": event.meta.event_id}
    except Exception as e:
        logger.error(f"Failed to publish event for user_id '{user_id}': {e}", exc_info=True)
        # In a production system, this might trigger a circuit breaker or retry logic.
        raise HTTPException(status_code=500, detail="Could not publish event to Kafka.")

@app.get("/health")
def health_check():
    return {"status": "healthy"}

To deploy this service, we use a Nomad job file. This declarative file tells Nomad what to run, how many copies, and what resources it needs. Note the use of Consul for service registration, which allows other services (like our Ray cluster) to discover it automatically.

# nomad_jobs/event_writer.nomad
job "event-writer" {
  datacenters = ["dc1"]
  type = "service"

  group "writer-api" {
    count = 2 # Run two instances for high availability

    network {
      port "http" {
        to = 8000
      }
    }

    # Register this service in Consul for discovery
    service {
      name     = "event-writer-api"
      port     = "http"
      provider = "nomad"

      check {
        type     = "http"
        path     = "/health"
        interval = "10s"
        timeout  = "2s"
      }
    }

    task "server" {
      driver = "docker"

      config {
        image = "my-registry/feature-store-services:latest" # Assume our app is packaged here
        command = "uvicorn"
        args = [
          "src.event_writer_service:app",
          "--host", "0.0.0.0",
          "--port", "8000"
        ]
        ports = ["http"]
      }
      
      # Environment variables to configure the application
      env {
        # Using Consul template to dynamically fetch Kafka address
        KAFKA_BOOTSTRAP_SERVERS = "kafka.service.consul:9092" 
        KAFKA_TOPIC = "user-events"
      }

      resources {
        cpu    = 200 # MHz
        memory = 256 # MB
      }
    }
  }
}

Distributed Computation: Feature Generation with Ray Actors

With events flowing into Kafka, the next challenge is processing them. This is where Ray’s power comes into play. We’ll design a Ray actor, which is essentially a stateful service in the Ray ecosystem. Each instance of our FeatureGeneratorActor will be responsible for a single user_id. It will maintain the user’s state in memory by consuming their event stream and will expose a method to compute features based on that state at a specific point in time.

The actor needs to be able to read events from Kafka for a specific user. The pitfall here is that a standard Kafka consumer group would load-balance partitions across consumers, meaning a single actor might not get the partition it needs. Instead, the actor must explicitly be assigned the specific partition that contains its user_id‘s events. This requires a bit of coordination but is essential for the model to work.

# src/feature_generator.py
import ray
import os
import json
import logging
from datetime import datetime, timezone
from kafka import KafkaConsumer, TopicPartition

# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "user-events")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()

# --- Logging Setup ---
# Ray actors run in separate processes, so logging needs to be configured within the actor.
def setup_logger():
    logger = logging.getLogger(f"FeatureGeneratorActor_{os.getpid()}")
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(LOG_LEVEL)
    return logger

@ray.remote
class FeatureGeneratorActor:
    def __init__(self, user_id: str, partition: int):
        self.logger = setup_logger()
        self.user_id = user_id
        self.partition = partition
        self.events = []
        self.state = {} # In-memory projected state
        self._initialized = False
        
        self.logger.info(f"Actor created for user_id={self.user_id} on partition={self.partition}")

    def _initialize_consumer(self):
        """
        Connects to Kafka and consumes all historical events for the user.
        This is a blocking operation and is the main reason for the actor model,
        as it isolates this heavy lifting.
        """
        try:
            # A common mistake is to share consumers across threads/processes.
            # Each actor must have its own consumer instance.
            consumer = KafkaConsumer(
                bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
                auto_offset_reset='earliest', # Start from the beginning
                consumer_timeout_ms=5000, # Don't block forever if no new messages
                value_deserializer=lambda v: json.loads(v.decode('utf-8'))
            )
            
            topic_partition = TopicPartition(KAFKA_TOPIC, self.partition)
            consumer.assign([topic_partition])
            consumer.seek_to_beginning(topic_partition)

            self.logger.info(f"Loading events for user_id={self.user_id} from partition {self.partition}")
            for message in consumer:
                # We only care about events for our specific user_id
                if message.key.decode('utf-8') == self.user_id:
                    self.events.append(message.value)
            
            self.logger.info(f"Loaded {len(self.events)} events for user_id={self.user_id}")
            self._initialized = True
        except Exception as e:
            self.logger.error(f"Failed to initialize Kafka consumer for user {self.user_id}: {e}", exc_info=True)
            # This actor is now in a failed state; subsequent calls should fail.
            self._initialized = False
        finally:
            if 'consumer' in locals():
                consumer.close()

    def _apply_event(self, state, event_data):
        """Applies a single event to a state dictionary, mutating it."""
        event_type = event_data.get("meta", {}).get("event_type")
        if event_type == "UserRegistered":
            state['registered'] = True
            state['email'] = event_data.get('email')
            state['transaction_countries'] = set()
            state['login_timestamps'] = []
        elif event_type == "LoginSucceeded":
            if 'login_timestamps' in state:
                state['login_timestamps'].append(event_data.get('timestamp'))
        elif event_type == "TransactionCreated":
             if 'transaction_countries' in state:
                state['transaction_countries'].add(event_data.get('country_code'))
        # Add other event applications here...

    async def generate_features(self, at_timestamp_str: str) -> dict:
        """
        Computes features for the user at a specific point in time.
        """
        if not self._initialized:
            self.logger.info("First call, initializing and loading history...")
            self._initialize_consumer()
            if not self._initialized: # Check again after attempt
                 raise RuntimeError(f"Actor for user {self.user_id} is in a failed state.")

        try:
            at_timestamp = datetime.fromisoformat(at_timestamp_str)
        except (ValueError, TypeError):
            raise ValueError("Invalid ISO 8601 timestamp format required.")

        # This is the core of event sourcing: re-compute state from the log.
        point_in_time_state = {}
        for event in self.events:
            event_ts_str = event.get("meta", {}).get("created_at")
            if not event_ts_str:
                continue
            
            event_ts = datetime.fromisoformat(event_ts_str)
            if event_ts <= at_timestamp:
                self._apply_event(point_in_time_state, event)
            else:
                # Events are ordered, so we can stop early.
                break
        
        # Now, derive features from the computed state.
        if not point_in_time_state:
            return {"error": "No events found for user before specified timestamp."}
        
        login_timestamps = point_in_time_state.get('login_timestamps', [])
        
        features = {
            "user_id": self.user_id,
            "distinct_transaction_countries": len(point_in_time_state.get('transaction_countries', set())),
            "total_logins": len(login_timestamps),
            "is_registered": point_in_time_state.get('registered', False),
            "computed_at": datetime.now(timezone.utc).isoformat()
        }

        self.logger.info(f"Generated features for {self.user_id} at {at_timestamp_str}")
        return features

Orchestrating a Ray cluster on Nomad is surprisingly straightforward. Nomad can run raw executables, not just Docker containers. We define a “group” for the Ray cluster with a head node and a set of worker nodes.

# nomad_jobs/ray_cluster.nomad
job "ray-cluster" {
  datacenters = ["dc1"]
  type = "service" # Keep the cluster running

  group "ray-head" {
    count = 1

    network {
      port "client" { to = 10001 }
      port "dashboard" { to = 8265 }
    }

    service {
      name = "ray-head"
      port = "client"
      provider = "nomad"
      # Other services will connect to ray-head.service.consul:10001
    }

    task "ray-head-task" {
      driver = "exec" # Can also be 'docker'

      config {
        command = "/usr/local/bin/ray"
        args = [
          "start",
          "--head",
          "--port", "6379", # Internal Ray port
          "--ray-client-server-port", "${NOMAD_PORT_client}",
          "--dashboard-host", "0.0.0.0",
          "--dashboard-port", "${NOMAD_PORT_dashboard}",
          "--num-cpus", "1" # Head node is for coordination
        ]
      }
      
      env {
        # This allows workers to discover the head node
        RAY_HEAD_ADDRESS = "${NOMAD_IP_client}:${NOMAD_PORT_client}"
      }
      
      resources {
        cpu    = 500
        memory = 1024
      }
    }
  }

  group "ray-workers" {
    # This can be scaled up or down based on load
    count = 4

    task "ray-worker-task" {
      driver = "exec"

      config {
        command = "/usr/local/bin/ray"
        args = [
          "start",
          # The address of the head node is discovered via Consul
          "--address", "ray-head.service.consul:6379"
        ]
      }
      
      resources {
        cpu    = 1000 # Workers do the heavy lifting
        memory = 2048
      }
    }
  }
}

Tying It Together: The Feature Serving API

The final piece is an API service that acts as the entry point for clients. It receives a request for features, determines which Ray actor to invoke, calls it, and returns the result. To avoid the expensive replay operation on every single request, this service will also implement a simple caching layer using Redis.

graph TD
    subgraph Client
        A[ML Model Training Job] --> B{Feature API};
    end

    subgraph Nomad Cluster
        B -- HTTP Request /get_features(user_id, timestamp) --> C[Feature API Service];
        C -- Check Cache --> D[Redis];
        C -- Cache Miss --> E[Ray Client];
        E -- Invoke Actor.generate_features() --> F[Ray Cluster Head];
        F -- Dispatches task --> G[FeatureGeneratorActor for user_id];
        G -- Read Events --> H[Kafka];
        G -- Computes Features --> E;
        E -- Returns Result --> C;
        C -- Write to Cache --> D;
        C -- HTTP Response --> B;
    end

Here’s the Python code for this API service. It manages a pool of Ray actors, creating them on-demand when a request for a new user_id comes in. A real-world project would need a more sophisticated actor management strategy, potentially evicting idle actors to save memory.

# src/feature_api_service.py
import os
import json
import logging
import ray
import redis
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from hashlib import md5

# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "user-events")
KAFKA_PARTITIONS = int(os.getenv("KAFKA_PARTITIONS", "16"))
RAY_ADDRESS = os.getenv("RAY_ADDRESS", "ray://localhost:10001")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))

# --- Other Setup ---
# (logging, etc. from previous examples)
logger = logging.getLogger(__name__)

# --- Global State Management ---
# In a multi-process server like uvicorn, this state is per-process.
# For true shared state, an external store or Ray's own features would be needed.
actor_handles = {}
redis_client = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_client
    logger.info("Initializing Feature API Service...")
    
    # Initialize Ray connection
    if not ray.is_initialized():
        ray.init(address=RAY_ADDRESS, ignore_reinit_error=True)
    logger.info(f"Connected to Ray at {RAY_ADDRESS}")

    # Initialize Redis connection
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    try:
        redis_client.ping()
        logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
    except redis.exceptions.ConnectionError as e:
        logger.error(f"Could not connect to Redis: {e}", exc_info=True)
        # Allow service to start but caching will fail.
        redis_client = None

    # Ensure Kafka topic exists (for demonstration purposes)
    try:
        admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
        topic_list = [NewTopic(name=KAFKA_TOPIC, num_partitions=KAFKA_PARTITIONS, replication_factor=1)]
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
        logger.info(f"Ensured Kafka topic '{KAFKA_TOPIC}' exists with {KAFKA_PARTITIONS} partitions.")
    except TopicAlreadyExistsError:
        logger.info(f"Kafka topic '{KAFKA_TOPIC}' already exists.")
    except Exception as e:
        logger.warning(f"Could not create/verify Kafka topic: {e}")

    yield
    
    logger.info("Shutting down Feature API Service...")
    ray.shutdown()

app = FastAPI(lifespan=lifespan)

def get_partition_for_user(user_id: str) -> int:
    """Consistently maps a user_id to a Kafka partition."""
    # This hashing must match Kafka's default partitioner for string keys.
    # For simplicity, we use a basic md5 hash. Production systems should verify
    # compatibility with the JVM's String.hashCode() if the producer is Java.
    return int(md5(user_id.encode('utf-8')).hexdigest(), 16) % KAFKA_PARTITIONS

def get_or_create_actor(user_id: str):
    """
    Retrieves an existing Ray actor handle for a user_id or creates a new one.
    This is a critical part of the system's resource management.
    """
    if user_id in actor_handles:
        return actor_handles[user_id]
    
    from .feature_generator import FeatureGeneratorActor

    partition = get_partition_for_user(user_id)
    logger.info(f"No active actor for user_id={user_id}. Creating a new one for partition {partition}.")
    
    # The actor name allows reconnecting to it later if this script restarts.
    actor = FeatureGeneratorActor.options(name=f"actor_{user_id}", get_if_exists=True).remote(
        user_id=user_id,
        partition=partition
    )
    actor_handles[user_id] = actor
    return actor

@app.get("/get_features/{user_id}")
async def get_features(user_id: str, at_timestamp: str):
    # Caching key should include all parameters that affect the result.
    cache_key = f"features:{user_id}:{at_timestamp}"
    if redis_client:
        cached_result = redis_client.get(cache_key)
        if cached_result:
            logger.info(f"Cache HIT for user {user_id} at {at_timestamp}")
            return json.loads(cached_result)

    logger.info(f"Cache MISS for user {user_id}. Invoking Ray actor.")
    
    try:
        actor = get_or_create_actor(user_id)
        # ray.get() is blocking, but we run it in an async endpoint.
        # FastAPI handles this by running it in a thread pool.
        features_ref = actor.generate_features.remote(at_timestamp)
        features = await features_ref # Using await on the remote call
    except Exception as e:
        logger.error(f"Error invoking Ray actor for user {user_id}: {e}", exc_info=True)
        # Remove potentially faulty actor handle
        if user_id in actor_handles:
            del actor_handles[user_id]
        raise HTTPException(status_code=500, detail="Failed to compute features.")

    if redis_client and "error" not in features:
        # Cache the result for 5 minutes
        redis_client.set(cache_key, json.dumps(features), ex=300)

    return features

The Nomad job for this final service is similar to the event writer, ensuring it can discover both the Ray head and Redis via Consul.

The final result is a system capable of generating point-in-time feature vectors on demand. A data scientist can now ask, “What were the features for user X just before their transaction at time T?” and get an answer in seconds, rather than waiting days for a manual data pull. This architecture fundamentally unlocked the team’s ability to experiment and iterate on models that rely on rich, temporal user behavior.

However, the current implementation has clear boundaries. The actor’s method of reading the entire event history from Kafka on initialization is naive and will not scale for entities with millions of events. A robust, production-grade version would require implementing snapshots, where the actor periodically persists its computed state and only needs to read events since the last snapshot. Furthermore, our actor management is simplistic; a proper system would require an actor supervisor to manage lifecycles, evict idle actors to conserve resources, and handle fault tolerance more gracefully. The current Nomad job definitions are also static. The next iteration should leverage Nomad’s autoscaling capabilities to dynamically adjust the number of Ray workers based on the length of the task queue or API latency, creating a truly elastic compute platform.


  TOC