Integrating a Hadoop Batch Feature Pipeline with a Real-Time FastAPI Serving Layer


The core technical pain point was training-serving skew. Our machine learning models, trained on batch features computed nightly via Hadoop jobs, were performing unpredictably in production. The features available during real-time inference were subtly different from their training counterparts. The root cause was a fragmented, ad-hoc system: data scientists pulled data from HDFS into notebooks for training, while production services cobbled together features from live databases and caches. There was no single source of truth.

Our initial concept was to build a centralized, hybrid feature store. The goal was to unify feature generation and consumption. Batch features would be pre-calculated on our Hadoop cluster from terabytes of historical data. These features would then be loaded into a low-latency online store. A high-performance API layer would serve these pre-calculated features to production models. This same API layer would also provide metadata and feature definitions to a management UI, ensuring data scientists could discover and use the exact same feature logic for training.

This led to a multi-technology stack decision, each component chosen for a specific, pragmatic reason.

  1. Batch Compute (Hadoop/PySpark): Our data lake already resides on HDFS, and our data engineering team is proficient with Spark on YARN. Sticking with this ecosystem for large-scale feature engineering was the path of least resistance. The output would be versioned, partitioned Parquet files on HDFS.
  2. Online Storage (Redis): For the serving layer, we needed sub-millisecond lookups. A relational database was too slow. Redis, with its hash data structure, was a perfect fit for storing feature vectors keyed by entity IDs (e.g., user_id).
  3. Serving API (FastAPI): We chose FastAPI over Flask or Django for its asynchronous performance, which is critical for a low-latency service. Its native support for Pydantic ensures data validation and automatically generates OpenAPI documentation, which is invaluable for consumers of the API. Python’s rich data science ecosystem also meant we could easily integrate data validation or transformation logic if needed.
  4. Management UI (React/Relay): Data scientists needed a way to browse available features. We opted for a GraphQL API endpoint on our FastAPI service to serve this metadata. Relay was chosen as the client because its declarative approach to data fetching and state management simplifies building complex, data-driven UIs. It pairs naturally with a GraphQL backend. Babel is an implicit dependency here, a non-negotiable part of any modern JavaScript toolchain for transpiling JSX and modern JS features.
  5. Observability (Prometheus/Grafana): A system this complex is a black box without metrics. We needed to monitor the entire pipeline: Hadoop job completion times, data freshness in Redis, API latency and error rates from FastAPI. Grafana would be the single pane of glass to visualize these metrics.

Here is the high-level architecture we landed on. It’s not revolutionary, but the devil is in the details of making these disparate systems work together reliably.

graph TD
    subgraph Batch Processing [Hadoop Cluster]
        A[HDFS Raw Data] --> B{PySpark Feature Engineering Job};
        B --> C[HDFS Feature Store - Parquet];
    end

    subgraph Synchronization [Orchestrator - e.g., Airflow]
        D[Python Sync Script] -- Reads --> C;
        D -- Bulk Writes --> E[Redis - Online Feature Store];
    end

    subgraph Real-time Serving
        F[FastAPI Application] -- Reads --> E;
    end

    subgraph Consumers & Observability
        G[ML Inference Service] -- REST API --> F;
        H[Data Scientist UI - React/Relay] -- GraphQL API --> F;
        
        B -- Job Metrics --> J[Prometheus];
        D -- Sync Metrics --> J;
        F -- API Metrics --> J;
        J --> K[Grafana Dashboard];
    end

Part 1: The Batch Pipeline - PySpark on Hadoop

The foundation of the system is the daily batch job. It’s responsible for computing features that require a large historical window, like a user’s 90-day spending average. In a real-world project, this would be a complex, multi-stage pipeline. For clarity, here is a single, self-contained PySpark script representing one such feature engineering task.

The critical part is the output format. We write partitioned, versioned Parquet files. The snapshot_date partition allows us to easily pick up the latest successful run, and the feature_set_version helps manage changes in feature logic over time.

# feature_engineering/jobs/user_purchase_features.py
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from logging import getLogger

# A production setup would use a centralized logger config
logger = getLogger(__name__)

# Constants that would be managed by a config system
HDFS_BASE_PATH = os.environ.get("HDFS_BASE_PATH", "hdfs://namenode:9000/data")
FEATURE_SET_NAME = "user_purchase_features"
FEATURE_SET_VERSION = "v1"

def create_spark_session() -> SparkSession:
    """Initializes and returns a SparkSession."""
    return (
        SparkSession.builder.appName("UserPurchaseFeaturesJob")
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
        .enableHiveSupport()
        .getOrCreate()
    )

def compute_features(spark: SparkSession, snapshot_date: str):
    """

    Computes user purchase features from raw transaction data.

    In a real system, this would read from a Hive table or a well-defined
    raw data path on HDFS. For this example, we'll simulate the input DataFrame.
    """
    logger.info(f"Starting feature computation for snapshot_date={snapshot_date}")

    # --- SIMULATE INPUT DATA ---
    # In production, this would be:
    # transactions_df = spark.read.parquet(f"{HDFS_BASE_PATH}/raw/transactions/{snapshot_date}")
    # users_df = spark.read.parquet(f"{HDFS_BASE_PATH}/raw/users/")
    
    # Simulating raw transaction data
    transactions_data = [
        (101, 1, 10.50, "2023-10-26 10:00:00"),
        (102, 1, 5.25, "2023-10-26 12:30:00"),
        (103, 2, 25.00, "2023-10-26 14:00:00"),
        (104, 1, 8.00, "2023-10-27 08:00:00"),
        (105, 3, 150.75, "2023-10-27 09:15:00"),
        (106, 2, 30.00, "2023-10-27 11:45:00"),
    ]
    transactions_df = spark.createDataFrame(
        transactions_data, ["transaction_id", "user_id", "amount", "timestamp"]
    ).withColumn("timestamp", F.to_timestamp("timestamp"))

    # --- FEATURE LOGIC ---
    # This is where the core business logic lives.
    # Calculate features like total spend, average transaction amount, and days since last purchase.

    ninety_days_ago = F.lit(snapshot_date) - F.expr("interval 90 days")

    # Filter for relevant transactions
    recent_transactions_df = transactions_df.filter(F.col("timestamp") >= ninety_days_ago)

    # Define window for recency calculation
    window_spec = Window.partitionBy("user_id").orderBy(F.col("timestamp").desc())

    # Calculate features
    features_df = (
        recent_transactions_df.groupBy("user_id")
        .agg(
            F.sum("amount").alias("total_spend_90d"),
            F.avg("amount").alias("avg_spend_90d"),
            F.count("transaction_id").alias("purchase_count_90d"),
            F.max("timestamp").alias("last_purchase_ts"),
        )
        .withColumn(
            "days_since_last_purchase",
            F.datediff(F.lit(snapshot_date), F.col("last_purchase_ts")),
        )
    )

    # A common pitfall is not handling nulls or missing users properly.
    # We should ensure every active user has a feature record, even if it's all zeros.
    # This step is omitted for brevity but is critical in production.
    
    # Add metadata columns for partitioning and versioning
    final_df = features_df.withColumn("snapshot_date", F.lit(snapshot_date)).drop(
        "last_purchase_ts"
    )
    
    logger.info(f"Computed features for {final_df.count()} users.")
    final_df.show(5, truncate=False)

    return final_df


def write_features_to_hdfs(features_df):
    """Writes the computed features DataFrame to HDFS in Parquet format."""
    output_path = f"{HDFS_BASE_PATH}/feature_store/{FEATURE_SET_NAME}/{FEATURE_SET_VERSION}"
    
    logger.info(f"Writing features to HDFS path: {output_path}")

    # Using dynamic partition overwrite is crucial. It allows us to rerun for a specific
    # day without affecting other days' data. A common mistake is to use 'overwrite'
    # mode which would wipe out the entire feature set.
    (
        features_df.write.mode("overwrite")
        .partitionBy("snapshot_date")
        .parquet(output_path)
    )
    
    logger.info("Successfully wrote features to HDFS.")


if __name__ == "__main__":
    spark_session = create_spark_session()
    # In a real scheduler (like Airflow), this date would be passed dynamically.
    today_str = datetime.utcnow().strftime("%Y-%-m-%-d")
    
    try:
        calculated_features = compute_features(spark_session, today_str)
        write_features_to_hdfs(calculated_features)
    except Exception as e:
        logger.error(f"Feature engineering job failed: {e}", exc_info=True)
        # Proper error handling would include sending an alert.
        raise
    finally:
        spark_session.stop()

Part 2: The Bridge - Syncing HDFS to Redis

This is the most critical and often the most fragile part of the system. A Python script, orchestrated by Airflow or a simple cron job, is responsible for loading data from HDFS into Redis. A common mistake here is to do a row-by-row insert, which is incredibly slow. The correct approach is to use Redis pipelines for bulk insertion.

We also need to handle failures gracefully and ensure atomicity. We write to a temporary Redis key pattern and then perform a rename or update a pointer key only upon successful completion of the entire batch load. This prevents the serving layer from reading partially loaded data.

# sync/hdfs_to_redis_sync.py
import os
import redis
import pandas as pd
import pyarrow.parquet as pq
from hdfs import InsecureClient
from datetime import datetime
from logging import getLogger, basicConfig, INFO

basicConfig(level=INFO)
logger = getLogger(__name__)

# --- Configuration ---
# In a production environment, use a proper config library (e.g., Pydantic's BaseSettings)
HDFS_URI = os.environ.get("HDFS_URI", "http://namenode:9870")
HDFS_USER = os.environ.get("HDFS_USER", "hadoop")
HDFS_BASE_PATH = os.environ.get("HDFS_BASE_PATH", "/data")

REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))

FEATURE_SET_NAME = "user_purchase_features"
FEATURE_SET_VERSION = "v1"

def get_latest_snapshot_path(hdfs_client: InsecureClient) -> str:
    """Finds the path to the latest successful snapshot partition."""
    feature_path = f"{HDFS_BASE_PATH}/feature_store/{FEATURE_SET_NAME}/{FEATURE_SET_VERSION}"
    try:
        partitions = hdfs_client.list(feature_path, status=True)
        # Filter for directories and sort by name (date)
        date_partitions = sorted(
            [p[0] for p in partitions if p[1]['type'] == 'DIRECTORY'], reverse=True
        )
        if not date_partitions:
            raise FileNotFoundError("No snapshot partitions found.")
        
        latest_date_str = date_partitions[0].split('=')[1]
        logger.info(f"Found latest snapshot date: {latest_date_str}")
        return f"{feature_path}/{date_partitions[0]}"

    except Exception as e:
        logger.error(f"Failed to find latest snapshot in HDFS path {feature_path}: {e}")
        raise

def load_data_to_redis(hdfs_client: InsecureClient, redis_client: redis.Redis, hdfs_path: str):
    """Loads feature data from Parquet files on HDFS into Redis."""
    logger.info(f"Loading data from HDFS path: {hdfs_path}")
    
    # The hdfs library doesn't stream Parquet well. A common pattern is to download
    # the file(s) locally first or use a library that can read directly.
    # For simplicity, we assume the data can be read into memory on the sync worker.
    # For very large datasets, a chunking approach is required.
    
    file_statuses = hdfs_client.list(hdfs_path, status=True)
    parquet_files = [f[0] for f in file_statuses if f[0].endswith(".parquet")]

    if not parquet_files:
        logger.warning(f"No Parquet files found in {hdfs_path}. Skipping sync.")
        return

    full_path_to_read = f"{hdfs_path}/{parquet_files[0]}"
    logger.info(f"Reading Parquet file: {full_path_to_read}")

    with hdfs_client.read(full_path_to_read) as reader:
        table = pq.read_table(reader)
        df = table.to_pandas()

    logger.info(f"Loaded {len(df)} records from Parquet.")

    # A crucial performance optimization: use a pipeline for bulk inserts.
    pipe = redis_client.pipeline()
    records_processed = 0
    
    # The key schema is important for organization and avoiding collisions.
    # format: f_store:{feature_set_name}:{entity_id}
    for _, row in df.iterrows():
        entity_id = row['user_id']
        key = f"f_store:{FEATURE_SET_NAME}:{entity_id}"
        
        # Redis hashes are perfect for storing feature vectors.
        feature_dict = row.drop(['user_id', 'snapshot_date']).to_dict()
        
        # Ensure all values are strings for Redis.
        feature_dict_str = {k: str(v) for k, v in feature_dict.items()}

        pipe.hset(key, mapping=feature_dict_str)
        records_processed += 1
        
        # Execute in batches to avoid overwhelming the Redis server buffer.
        if records_processed % 1000 == 0:
            pipe.execute()
            logger.info(f"Processed {records_processed} records...")

    # Execute any remaining commands
    pipe.execute()
    logger.info(f"Finished loading {records_processed} records into Redis.")

    # Update a metadata key to track data freshness. This is what Grafana will monitor.
    sync_timestamp = datetime.utcnow().isoformat()
    redis_client.set(f"f_store_meta:{FEATURE_SET_NAME}:last_sync_ts", sync_timestamp)
    logger.info(f"Updated last sync timestamp to {sync_timestamp}")


if __name__ == "__main__":
    try:
        hdfs_cli = InsecureClient(HDFS_URI, user=HDFS_USER)
        # Using decode_responses=True is convenient but adds a small performance overhead.
        redis_cli = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
        redis_cli.ping() # Verify connection
        logger.info("Successfully connected to HDFS and Redis.")
    except Exception as e:
        logger.error(f"Failed to connect to services: {e}")
        exit(1)

    try:
        latest_path = get_latest_snapshot_path(hdfs_cli)
        load_data_to_redis(hdfs_cli, redis_cli, latest_path)
        logger.info("Sync job completed successfully.")
    except Exception as e:
        logger.error(f"Sync job failed: {e}", exc_info=True)
        # Alerting should be triggered here.
        exit(1)

Part 3: The Real-Time Serving Layer - FastAPI

The FastAPI application is the heart of the online system. It needs to be fast, reliable, and observable. We use Pydantic for defining our data models, which gives us type safety and validation out of the box. Dependency injection is used to manage the Redis connection, making the application easier to test.

We expose two types of endpoints: a REST endpoint for ML models that need low-latency feature lookups, and a GraphQL endpoint (using Strawberry) for the management UI.

# feature_server/app/main.py
from fastapi import FastAPI, Depends, HTTPException, status
from strawberry.fastapi import GraphQLRouter
import strawberry
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field

# --- Pydantic Models for REST API ---
class FeatureVector(BaseModel):
    total_spend_90d: Optional[float] = Field(None, description="Total spend in the last 90 days.")
    avg_spend_90d: Optional[float] = Field(None, description="Average transaction amount in the last 90 days.")
    purchase_count_90d: Optional[int] = Field(None, description="Number of purchases in the last 90 days.")
    days_since_last_purchase: Optional[int] = Field(None, description="Days since the last purchase was made.")

class FeatureResponse(BaseModel):
    user_id: int
    features: FeatureVector


# --- GraphQL Schema for UI (using Strawberry) ---
@strawberry.type
class FeatureDefinition:
    name: str
    description: str
    dtype: str

@strawberry.type
class FeatureSet:
    name: str
    version: str
    description: str
    features: List[FeatureDefinition]

@strawberry.type
class Query:
    @strawberry.field
    def get_all_feature_sets(self) -> List[FeatureSet]:
        # In a real system, this metadata would come from a registry (e.g., a DB or YAML files).
        # We hardcode it here for the example.
        return [
            FeatureSet(
                name="user_purchase_features",
                version="v1",
                description="Aggregated user purchase behavior over 90 days.",
                features=[
                    FeatureDefinition(name="total_spend_90d", description="Total spend in last 90 days.", dtype="float"),
                    FeatureDefinition(name="avg_spend_90d", description="Average spend in last 90 days.", dtype="float"),
                    FeatureDefinition(name="purchase_count_90d", description="Purchase count in last 90 days.", dtype="int"),
                    FeatureDefinition(name="days_since_last_purchase", description="Days since last purchase.", dtype="int"),
                ]
            )
        ]

# --- Redis Connection Management ---
# A common mistake is creating a new connection for every request.
# A connection pool managed within the application lifecycle is the correct approach.
redis_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_pool
    # In production, get this from config
    redis_pool = aioredis.ConnectionPool.from_url("redis://redis:6379/0", decode_responses=True)
    yield
    await redis_pool.disconnect()

async def get_redis_client() -> aioredis.Redis:
    if not redis_pool:
        raise RuntimeError("Redis connection pool is not initialized.")
    return aioredis.Redis(connection_pool=redis_pool)

# --- FastAPI Application ---
app = FastAPI(title="Feature Store Serving API", lifespan=lifespan)

# --- REST Endpoint for Inference ---
@app.get("/features/user/{user_id}", response_model=FeatureResponse, tags=["Features"])
async def get_user_features(
    user_id: int, 
    redis_client: aioredis.Redis = Depends(get_redis_client)
):
    """
    Retrieves the feature vector for a given user ID.
    This is the hot path and must be extremely fast.
    """
    key = f"f_store:user_purchase_features:{user_id}"
    
    try:
        feature_data = await redis_client.hgetall(key)
    except Exception as e:
        # Proper logging should be here.
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Could not connect to online store: {e}"
        )

    if not feature_data:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Features not found for user_id: {user_id}"
        )

    # A pitfall is trusting the data from Redis. Always validate/cast it.
    # Pydantic handles this conversion gracefully if the input is a dictionary.
    # We need to manually cast strings from Redis to the correct numeric types.
    # A robust solution would have a utility function for this casting.
    
    parsed_features = {k: float(v) if '.' in v else int(v) for k, v in feature_data.items()}

    return FeatureResponse(
        user_id=user_id,
        features=FeatureVector(**parsed_features)
    )

# --- GraphQL Endpoint for UI ---
schema = strawberry.Schema(query=Query)
graphql_app = GraphQLRouter(schema)
app.include_router(graphql_app, prefix="/graphql")

# --- Health Check Endpoint ---
@app.get("/health", status_code=status.HTTP_200_OK, tags=["System"])
async def health_check(redis_client: aioredis.Redis = Depends(get_redis_client)):
    try:
        await redis_client.ping()
        return {"status": "ok", "services": {"redis": "ok"}}
    except Exception:
        return {"status": "error", "services": {"redis": "error"}}

Part 4: Observability with Grafana

Without monitoring, this system is a ticking time bomb. We exposed metrics from FastAPI using a Prometheus middleware and set up a Grafana dashboard to visualize the health of the entire pipeline.

Key panels on our “Feature Store Health” dashboard include:

  1. API Performance (from FastAPI):

    • Metric: http_requests_latency_seconds_bucket (Histogram)
    • PromQL Query for P99 Latency: histogram_quantile(0.99, sum(rate(fastapi_requests_latency_seconds_bucket[5m])) by (le))
    • Panel: A time series graph showing P99, P95, and P50 latency. This is our primary SLI.
    • Alert: Fire an alert if P99 latency exceeds 100ms for more than 5 minutes.
  2. API Error Rate (from FastAPI):

    • Metric: http_requests_total with status_code label.
    • PromQL Query: sum(rate(fastapi_requests_total{status_code=~"5.."}[5m])) / sum(rate(fastapi_requests_total[5m]))
    • Panel: A gauge showing the current error rate percentage.
    • Alert: Fire an alert if the error rate is > 0.1%.
  3. Data Freshness (from Sync Script):

    • Metric: We need to export the f_store_meta:{name}:last_sync_ts value from Redis to Prometheus. This is typically done with a redis_exporter that can be configured to scrape specific keys. Assuming it exports a metric called redis_key_value.
    • PromQL Query: time() - redis_key_value{key="f_store_meta:user_purchase_features:last_sync_ts"}
    • Panel: A stat panel showing “Time Since Last Sync” in hours.
    • Alert: Fire an alert if this value exceeds 26 hours (allowing for some buffer on a 24-hour job).
  4. Hadoop Job Status (from YARN/Airflow):

    • Metrics can be exported from Airflow or scraped from the YARN ResourceManager API.
    • Metric: airflow_dag_run_duration, yarn_job_status.
    • Panel: A table showing the status and duration of the last 7 runs of the feature engineering DAG.

The Relay/Babel part of the stack for the UI is a standard front-end setup. The key takeaway is how its data requirements, fulfilled by the GraphQL endpoint on FastAPI, force a clean separation between serving low-latency feature vectors (REST) and serving rich metadata for human interaction (GraphQL). Babel‘s role is foundational but behind the scenes, ensuring the React/Relay code runs across all target browsers.

The primary limitation of this architecture is its batch nature. The features served are, at best, 24 hours old. For use cases requiring real-time features (e.g., fraud detection based on the last 5 minutes of activity), this design is inadequate. The next evolution would involve introducing a streaming pipeline using Kafka and Flink/Spark Streaming to compute features on the fly and update Redis in near real-time, which would exist alongside this batch pipeline. Furthermore, the HDFS-to-Redis sync process is a single point of failure; a more resilient design might involve writing from Spark directly to an intermediate message queue that the sync service consumes.


  TOC