The request from the machine learning team was deceptive in its simplicity: “We need to backfill features for our new fraud detection model using data from the last six months.” This triggered a cascade of failures in our existing architecture. Our production database only held the current state of user accounts, and our nightly ETL jobs, which populated a data warehouse, were destructive, overwriting previous values. Reconstructing the precise state of a user account at any given point in time was impossible. Every new feature idea or model retraining effort was blocked by this fundamental limitation. The engineering cost of manually backfilling data was spiraling, and the iteration speed for our data scientists had ground to a halt.
Our initial concept was a radical departure from traditional state management. Instead of storing the current state, we would persist a chronologically ordered, immutable sequence of domain events. An account’s balance isn’t a mutable field in a database table; it’s the result of folding a series of DepositMade
and WithdrawalExecuted
events. This is the core of Event Sourcing. This pattern would provide a perfect, replayable audit log of everything that has ever happened in the system, making point-in-time state reconstruction a first-class capability. The challenge, however, shifted from data storage to data processing. Replaying millions of events for thousands of users on-demand to generate feature vectors required a compute framework that was both massively parallel and lightweight enough for interactive queries.
This led to our technology selection. For the event log, we needed a durable, high-throughput message broker. Apache Kafka was a solid choice. For the compute layer, standard batch processing tools like Spark felt too heavyweight; their startup latency and resource overhead were ill-suited for the on-demand, single-entity feature generation we envisioned. Ray, with its lightweight actor model and task parallelism, offered the perfect fit. It allowed us to conceptualize feature generation as a set of distributed actors, each responsible for managing the state of a single entity by processing its event stream. Finally, for orchestration, we needed a tool that could manage this heterogeneous workload: a stateful Kafka cluster, stateless Python API services, and the dynamic, ephemeral Ray compute clusters. While Kubernetes is the dominant player, its operational complexity was a significant concern for our small platform team. HashiCorp Nomad, with its architectural simplicity, first-class support for non-containerized jobs, and seamless integration with Consul for service discovery, presented a more pragmatic path forward. This post-mortem details the build-out of this replayable feature store, from the event model to the orchestration of Ray actors on a Nomad cluster.
Defining the Immutable Log: Events and the Writer Service
The foundation of the system is the event itself. In a real-world project, defining these events requires careful domain modeling, but for this implementation, we’ll focus on a simplified user activity domain. Each event is a simple, immutable data structure carrying all necessary information about a state change.
# src/events.py
import uuid
from datetime import datetime, timezone
from pydantic import BaseModel, Field
def current_time_utc() -> datetime:
return datetime.now(timezone.utc)
def generate_uuid() -> str:
return str(uuid.uuid4())
class EventMetadata(BaseModel):
event_id: str = Field(default_factory=generate_uuid)
event_type: str
created_at: datetime = Field(default_factory=current_time_utc)
version: int = 1
class UserRegistered(BaseModel):
meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="UserRegistered"))
user_id: str
email: str
registered_at: datetime
class LoginSucceeded(BaseModel):
meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="LoginSucceeded"))
user_id: str
ip_address: str
timestamp: datetime
class PasswordChanged(BaseModel):
meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="PasswordChanged"))
user_id: str
timestamp: datetime
class TransactionCreated(BaseModel):
meta: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type="TransactionCreated"))
user_id: str
transaction_id: str
amount_usd: float
country_code: str
timestamp: datetime
The next component is a service responsible for ingesting these events and writing them to our durable log, Kafka. A critical requirement here is that all events for a given entity (e.g., a user_id
) must land in the same Kafka partition. This guarantees ordering and simplifies the consumption logic for our downstream Ray actors, as a single consumer will see all events for a user in the exact order they occurred.
Here’s the core of the event writer, a simple FastAPI application.
# src/event_writer_service.py
import os
import json
import logging
from typing import Union
from fastapi import FastAPI, HTTPException, Request
from aiokafka import AIOKafkaProducer
from contextlib import asynccontextmanager
from .events import UserRegistered, LoginSucceeded, PasswordChanged, TransactionCreated
# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "user-events")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
# --- Logging Setup ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Kafka Producer Lifecycle Management ---
producer = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global producer
logger.info(f"Connecting to Kafka at {KAFKA_BOOTSTRAP_SERVERS}...")
try:
producer = AIOKafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
await producer.start()
logger.info("Kafka producer started successfully.")
yield
finally:
if producer:
logger.info("Stopping Kafka producer...")
await producer.stop()
logger.info("Kafka producer stopped.")
app = FastAPI(lifespan=lifespan)
# --- Type Hint for all possible events ---
EventType = Union[UserRegistered, LoginSucceeded, PasswordChanged, TransactionCreated]
@app.post("/publish_event")
async def publish_event(event: EventType):
"""
Receives an event, validates its structure, and publishes it to Kafka.
The `user_id` is used as the partition key to ensure ordering.
"""
if not hasattr(event, 'user_id'):
raise HTTPException(status_code=400, detail="Event must contain a 'user_id' field.")
event_dict = event.model_dump()
user_id = event.user_id
event_type = event.meta.event_type
logger.info(f"Publishing event '{event_type}' for user_id '{user_id}'")
try:
# The key=user_id is critical. Kafka's default partitioner will use this
# to consistently hash the key to the same partition.
await producer.send_and_wait(
topic=KAFKA_TOPIC,
value=event_dict,
key=user_id
)
return {"status": "success", "event_id": event.meta.event_id}
except Exception as e:
logger.error(f"Failed to publish event for user_id '{user_id}': {e}", exc_info=True)
# In a production system, this might trigger a circuit breaker or retry logic.
raise HTTPException(status_code=500, detail="Could not publish event to Kafka.")
@app.get("/health")
def health_check():
return {"status": "healthy"}
To deploy this service, we use a Nomad job file. This declarative file tells Nomad what to run, how many copies, and what resources it needs. Note the use of Consul for service registration, which allows other services (like our Ray cluster) to discover it automatically.
# nomad_jobs/event_writer.nomad
job "event-writer" {
datacenters = ["dc1"]
type = "service"
group "writer-api" {
count = 2 # Run two instances for high availability
network {
port "http" {
to = 8000
}
}
# Register this service in Consul for discovery
service {
name = "event-writer-api"
port = "http"
provider = "nomad"
check {
type = "http"
path = "/health"
interval = "10s"
timeout = "2s"
}
}
task "server" {
driver = "docker"
config {
image = "my-registry/feature-store-services:latest" # Assume our app is packaged here
command = "uvicorn"
args = [
"src.event_writer_service:app",
"--host", "0.0.0.0",
"--port", "8000"
]
ports = ["http"]
}
# Environment variables to configure the application
env {
# Using Consul template to dynamically fetch Kafka address
KAFKA_BOOTSTRAP_SERVERS = "kafka.service.consul:9092"
KAFKA_TOPIC = "user-events"
}
resources {
cpu = 200 # MHz
memory = 256 # MB
}
}
}
}
Distributed Computation: Feature Generation with Ray Actors
With events flowing into Kafka, the next challenge is processing them. This is where Ray’s power comes into play. We’ll design a Ray actor, which is essentially a stateful service in the Ray ecosystem. Each instance of our FeatureGeneratorActor
will be responsible for a single user_id
. It will maintain the user’s state in memory by consuming their event stream and will expose a method to compute features based on that state at a specific point in time.
The actor needs to be able to read events from Kafka for a specific user. The pitfall here is that a standard Kafka consumer group would load-balance partitions across consumers, meaning a single actor might not get the partition it needs. Instead, the actor must explicitly be assigned the specific partition that contains its user_id
‘s events. This requires a bit of coordination but is essential for the model to work.
# src/feature_generator.py
import ray
import os
import json
import logging
from datetime import datetime, timezone
from kafka import KafkaConsumer, TopicPartition
# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "user-events")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
# --- Logging Setup ---
# Ray actors run in separate processes, so logging needs to be configured within the actor.
def setup_logger():
logger = logging.getLogger(f"FeatureGeneratorActor_{os.getpid()}")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(LOG_LEVEL)
return logger
@ray.remote
class FeatureGeneratorActor:
def __init__(self, user_id: str, partition: int):
self.logger = setup_logger()
self.user_id = user_id
self.partition = partition
self.events = []
self.state = {} # In-memory projected state
self._initialized = False
self.logger.info(f"Actor created for user_id={self.user_id} on partition={self.partition}")
def _initialize_consumer(self):
"""
Connects to Kafka and consumes all historical events for the user.
This is a blocking operation and is the main reason for the actor model,
as it isolates this heavy lifting.
"""
try:
# A common mistake is to share consumers across threads/processes.
# Each actor must have its own consumer instance.
consumer = KafkaConsumer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
auto_offset_reset='earliest', # Start from the beginning
consumer_timeout_ms=5000, # Don't block forever if no new messages
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
topic_partition = TopicPartition(KAFKA_TOPIC, self.partition)
consumer.assign([topic_partition])
consumer.seek_to_beginning(topic_partition)
self.logger.info(f"Loading events for user_id={self.user_id} from partition {self.partition}")
for message in consumer:
# We only care about events for our specific user_id
if message.key.decode('utf-8') == self.user_id:
self.events.append(message.value)
self.logger.info(f"Loaded {len(self.events)} events for user_id={self.user_id}")
self._initialized = True
except Exception as e:
self.logger.error(f"Failed to initialize Kafka consumer for user {self.user_id}: {e}", exc_info=True)
# This actor is now in a failed state; subsequent calls should fail.
self._initialized = False
finally:
if 'consumer' in locals():
consumer.close()
def _apply_event(self, state, event_data):
"""Applies a single event to a state dictionary, mutating it."""
event_type = event_data.get("meta", {}).get("event_type")
if event_type == "UserRegistered":
state['registered'] = True
state['email'] = event_data.get('email')
state['transaction_countries'] = set()
state['login_timestamps'] = []
elif event_type == "LoginSucceeded":
if 'login_timestamps' in state:
state['login_timestamps'].append(event_data.get('timestamp'))
elif event_type == "TransactionCreated":
if 'transaction_countries' in state:
state['transaction_countries'].add(event_data.get('country_code'))
# Add other event applications here...
async def generate_features(self, at_timestamp_str: str) -> dict:
"""
Computes features for the user at a specific point in time.
"""
if not self._initialized:
self.logger.info("First call, initializing and loading history...")
self._initialize_consumer()
if not self._initialized: # Check again after attempt
raise RuntimeError(f"Actor for user {self.user_id} is in a failed state.")
try:
at_timestamp = datetime.fromisoformat(at_timestamp_str)
except (ValueError, TypeError):
raise ValueError("Invalid ISO 8601 timestamp format required.")
# This is the core of event sourcing: re-compute state from the log.
point_in_time_state = {}
for event in self.events:
event_ts_str = event.get("meta", {}).get("created_at")
if not event_ts_str:
continue
event_ts = datetime.fromisoformat(event_ts_str)
if event_ts <= at_timestamp:
self._apply_event(point_in_time_state, event)
else:
# Events are ordered, so we can stop early.
break
# Now, derive features from the computed state.
if not point_in_time_state:
return {"error": "No events found for user before specified timestamp."}
login_timestamps = point_in_time_state.get('login_timestamps', [])
features = {
"user_id": self.user_id,
"distinct_transaction_countries": len(point_in_time_state.get('transaction_countries', set())),
"total_logins": len(login_timestamps),
"is_registered": point_in_time_state.get('registered', False),
"computed_at": datetime.now(timezone.utc).isoformat()
}
self.logger.info(f"Generated features for {self.user_id} at {at_timestamp_str}")
return features
Orchestrating a Ray cluster on Nomad is surprisingly straightforward. Nomad can run raw executables, not just Docker containers. We define a “group” for the Ray cluster with a head node and a set of worker nodes.
# nomad_jobs/ray_cluster.nomad
job "ray-cluster" {
datacenters = ["dc1"]
type = "service" # Keep the cluster running
group "ray-head" {
count = 1
network {
port "client" { to = 10001 }
port "dashboard" { to = 8265 }
}
service {
name = "ray-head"
port = "client"
provider = "nomad"
# Other services will connect to ray-head.service.consul:10001
}
task "ray-head-task" {
driver = "exec" # Can also be 'docker'
config {
command = "/usr/local/bin/ray"
args = [
"start",
"--head",
"--port", "6379", # Internal Ray port
"--ray-client-server-port", "${NOMAD_PORT_client}",
"--dashboard-host", "0.0.0.0",
"--dashboard-port", "${NOMAD_PORT_dashboard}",
"--num-cpus", "1" # Head node is for coordination
]
}
env {
# This allows workers to discover the head node
RAY_HEAD_ADDRESS = "${NOMAD_IP_client}:${NOMAD_PORT_client}"
}
resources {
cpu = 500
memory = 1024
}
}
}
group "ray-workers" {
# This can be scaled up or down based on load
count = 4
task "ray-worker-task" {
driver = "exec"
config {
command = "/usr/local/bin/ray"
args = [
"start",
# The address of the head node is discovered via Consul
"--address", "ray-head.service.consul:6379"
]
}
resources {
cpu = 1000 # Workers do the heavy lifting
memory = 2048
}
}
}
}
Tying It Together: The Feature Serving API
The final piece is an API service that acts as the entry point for clients. It receives a request for features, determines which Ray actor to invoke, calls it, and returns the result. To avoid the expensive replay operation on every single request, this service will also implement a simple caching layer using Redis.
graph TD subgraph Client A[ML Model Training Job] --> B{Feature API}; end subgraph Nomad Cluster B -- HTTP Request /get_features(user_id, timestamp) --> C[Feature API Service]; C -- Check Cache --> D[Redis]; C -- Cache Miss --> E[Ray Client]; E -- Invoke Actor.generate_features() --> F[Ray Cluster Head]; F -- Dispatches task --> G[FeatureGeneratorActor for user_id]; G -- Read Events --> H[Kafka]; G -- Computes Features --> E; E -- Returns Result --> C; C -- Write to Cache --> D; C -- HTTP Response --> B; end
Here’s the Python code for this API service. It manages a pool of Ray actors, creating them on-demand when a request for a new user_id
comes in. A real-world project would need a more sophisticated actor management strategy, potentially evicting idle actors to save memory.
# src/feature_api_service.py
import os
import json
import logging
import ray
import redis
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from hashlib import md5
# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "user-events")
KAFKA_PARTITIONS = int(os.getenv("KAFKA_PARTITIONS", "16"))
RAY_ADDRESS = os.getenv("RAY_ADDRESS", "ray://localhost:10001")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
# --- Other Setup ---
# (logging, etc. from previous examples)
logger = logging.getLogger(__name__)
# --- Global State Management ---
# In a multi-process server like uvicorn, this state is per-process.
# For true shared state, an external store or Ray's own features would be needed.
actor_handles = {}
redis_client = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client
logger.info("Initializing Feature API Service...")
# Initialize Ray connection
if not ray.is_initialized():
ray.init(address=RAY_ADDRESS, ignore_reinit_error=True)
logger.info(f"Connected to Ray at {RAY_ADDRESS}")
# Initialize Redis connection
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
try:
redis_client.ping()
logger.info(f"Connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
except redis.exceptions.ConnectionError as e:
logger.error(f"Could not connect to Redis: {e}", exc_info=True)
# Allow service to start but caching will fail.
redis_client = None
# Ensure Kafka topic exists (for demonstration purposes)
try:
admin_client = KafkaAdminClient(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
topic_list = [NewTopic(name=KAFKA_TOPIC, num_partitions=KAFKA_PARTITIONS, replication_factor=1)]
admin_client.create_topics(new_topics=topic_list, validate_only=False)
logger.info(f"Ensured Kafka topic '{KAFKA_TOPIC}' exists with {KAFKA_PARTITIONS} partitions.")
except TopicAlreadyExistsError:
logger.info(f"Kafka topic '{KAFKA_TOPIC}' already exists.")
except Exception as e:
logger.warning(f"Could not create/verify Kafka topic: {e}")
yield
logger.info("Shutting down Feature API Service...")
ray.shutdown()
app = FastAPI(lifespan=lifespan)
def get_partition_for_user(user_id: str) -> int:
"""Consistently maps a user_id to a Kafka partition."""
# This hashing must match Kafka's default partitioner for string keys.
# For simplicity, we use a basic md5 hash. Production systems should verify
# compatibility with the JVM's String.hashCode() if the producer is Java.
return int(md5(user_id.encode('utf-8')).hexdigest(), 16) % KAFKA_PARTITIONS
def get_or_create_actor(user_id: str):
"""
Retrieves an existing Ray actor handle for a user_id or creates a new one.
This is a critical part of the system's resource management.
"""
if user_id in actor_handles:
return actor_handles[user_id]
from .feature_generator import FeatureGeneratorActor
partition = get_partition_for_user(user_id)
logger.info(f"No active actor for user_id={user_id}. Creating a new one for partition {partition}.")
# The actor name allows reconnecting to it later if this script restarts.
actor = FeatureGeneratorActor.options(name=f"actor_{user_id}", get_if_exists=True).remote(
user_id=user_id,
partition=partition
)
actor_handles[user_id] = actor
return actor
@app.get("/get_features/{user_id}")
async def get_features(user_id: str, at_timestamp: str):
# Caching key should include all parameters that affect the result.
cache_key = f"features:{user_id}:{at_timestamp}"
if redis_client:
cached_result = redis_client.get(cache_key)
if cached_result:
logger.info(f"Cache HIT for user {user_id} at {at_timestamp}")
return json.loads(cached_result)
logger.info(f"Cache MISS for user {user_id}. Invoking Ray actor.")
try:
actor = get_or_create_actor(user_id)
# ray.get() is blocking, but we run it in an async endpoint.
# FastAPI handles this by running it in a thread pool.
features_ref = actor.generate_features.remote(at_timestamp)
features = await features_ref # Using await on the remote call
except Exception as e:
logger.error(f"Error invoking Ray actor for user {user_id}: {e}", exc_info=True)
# Remove potentially faulty actor handle
if user_id in actor_handles:
del actor_handles[user_id]
raise HTTPException(status_code=500, detail="Failed to compute features.")
if redis_client and "error" not in features:
# Cache the result for 5 minutes
redis_client.set(cache_key, json.dumps(features), ex=300)
return features
The Nomad job for this final service is similar to the event writer, ensuring it can discover both the Ray head and Redis via Consul.
The final result is a system capable of generating point-in-time feature vectors on demand. A data scientist can now ask, “What were the features for user X just before their transaction at time T?” and get an answer in seconds, rather than waiting days for a manual data pull. This architecture fundamentally unlocked the team’s ability to experiment and iterate on models that rely on rich, temporal user behavior.
However, the current implementation has clear boundaries. The actor’s method of reading the entire event history from Kafka on initialization is naive and will not scale for entities with millions of events. A robust, production-grade version would require implementing snapshots, where the actor periodically persists its computed state and only needs to read events since the last snapshot. Furthermore, our actor management is simplistic; a proper system would require an actor supervisor to manage lifecycles, evict idle actors to conserve resources, and handle fault tolerance more gracefully. The current Nomad job definitions are also static. The next iteration should leverage Nomad’s autoscaling capabilities to dynamically adjust the number of Ray workers based on the length of the task queue or API latency, creating a truly elastic compute platform.