Implementing a Versioned Feature Engineering Pipeline with Dask, DVC, and a GraphQL Interface on Google Cloud


Our machine learning team was hitting a wall. Not a modeling wall, but an infrastructure and process wall. Reproducibility was a nightmare. A model trained three months ago was impossible to replicate because the underlying data pipeline had shifted, or the feature generation logic, buried in some forgotten notebook, was subtly different. Training-serving skew was becoming a regular source of production incidents. The core pain point was a lack of a single source of truth for our data, feature logic, and the resulting feature sets. We needed a system that was scalable, version-controlled from end to end, and flexible enough to serve diverse modeling needs.

The initial concept was to build a centralized feature platform. After several design sessions, we settled on a specific architectural stack. For data and pipeline versioning, Data Version Control (DVC) was a clear choice; its Git-like workflow was immediately familiar to the team. For scalable, Python-native computation, Dask was selected over Spark to keep the cognitive overhead low for our pandas-heavy data scientists. For data access, we controversially chose GraphQL. While a simple REST API would have worked, the ability for model training clients and inference services to request exactly the columns they need, without endpoint changes, was a compelling argument against over-fetching and for future flexibility. The entire system would be built on our existing Google Cloud (GCP) infrastructure, leveraging Google Cloud Storage (GCS) for data and Google Kubernetes Engine (GKE) for compute and serving.

This is the build log of that system, detailing the implementation, the problems encountered, and the production-grade patterns we established.

Phase 1: Foundational Data Versioning with DVC and GCS

Before any computation, the raw data itself must be immutable and versioned. We treat our GCS buckets as DVC remotes. In a real-world project, you absolutely need separate buckets for raw data, intermediate data, and final feature sets to manage IAM permissions effectively.

The initial setup is straightforward.

# Initialize DVC in our Git repository
dvc init

# Configure a GCS bucket as the remote storage for DVC-tracked data.
# The `mlops-feature-store-dvc` bucket must exist.
dvc remote add -d gcs_remote gcs://mlops-feature-store-dvc

# Set up the credentials path. In a CI/CD environment or on GKE,
# you'd use Workload Identity or pre-configured service accounts.
# For local dev, this points to the service account key.
gcloud auth application-default login
# Or explicitly:
# export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/key.json"

A common mistake is mishandling credentials. Hardcoding keys is a non-starter. For local development, gcloud auth application-default login is sufficient. For automated environments, a dedicated IAM service account with Storage Object Admin rights on the DVC bucket is the minimum viable security posture.

With the remote configured, we can track our initial raw dataset.

# Assume we have raw data in data/raw/
mkdir -p data/raw
# (Populate with some CSVs, e.g., transactions.csv, user_profiles.csv)

dvc add data/raw

# This creates a `data/raw.dvc` file, which is a small text pointer.
# Now we commit this pointer to Git.
git add data/raw.dvc .gitignore
git commit -m "feat: Add initial raw datasets"

# And push the actual data to GCS.
dvc push

This simple workflow already solves a major part of our reproducibility problem. Anyone can now git pull and dvc pull to retrieve the exact version of the raw data tied to a specific Git commit.

Phase 2: Scalable Feature Computation with a Remote Dask Cluster

The core of our pipeline is the feature transformation logic. These are not simple aggregations; they involve complex time-windowed statistics and entity-level rollups that can easily overwhelm a single machine when run on terabytes of data. Dask is the engine for this.

While dask.distributed.LocalCluster is fine for testing, it’s a dead end for production. We need a robust, scalable cluster. We opted to run Dask on GKE using the Dask Kubernetes Operator, which allows us to define a Dask cluster declaratively.

First, the DaskCluster Custom Resource Definition (CRD) must be installed on the GKE cluster. Once that’s done, we can define our cluster.

k8s/dask-cluster.yaml

apiVersion: "kubernetes.dask.org/v1"
kind: DaskCluster
metadata:
  name: feature-gen-cluster
spec:
  # Configuration for the Dask scheduler pod
  scheduler:
    spec:
      containers:
        - name: scheduler
          image: daskdev/dask:2023.9.2
          args:
            - dask-scheduler
          ports:
            - name: tcp-comm
              containerPort: 8786
            - name: dashboard
              containerPort: 8787
          resources:
            requests:
              cpu: "1"
              memory: "4Gi"
            limits:
              cpu: "2"
              memory: "8Gi"
  # Configuration for the Dask worker pods
  worker:
    replicas: 3 # Start with 3 workers, can be scaled manually or automatically
    spec:
      containers:
        - name: worker
          image: daskdev/dask:2023.9.2
          args:
            - dask-worker
            - --name
            - $(DASK_WORKER_NAME)
            - --nthreads
            - "2"        # Number of threads per worker
            - --memory-limit
            - "14Gi"     # Memory limit per worker
            - --name
            - $(DASK_WORKER_NAME)
          resources:
            requests:
              cpu: "2"
              memory: "15Gi"
            limits:
              cpu: "4"
              memory: "16Gi"

---
# Expose the scheduler for clients to connect
apiVersion: v1
kind: Service
metadata:
  name: feature-gen-cluster-scheduler
spec:
  selector:
    dask.org/cluster-name: feature-gen-cluster
    dask.org/component: scheduler
  ports:
    - name: tcp-comm
      protocol: TCP
      port: 8786
      targetPort: 8786
    - name: dashboard
      protocol: TCP
      port: 8787
      targetPort: 8787
  type: ClusterIP # Or LoadBalancer for external access, but ClusterIP is more secure for internal jobs

With this YAML applied (kubectl apply -f k8s/dask-cluster.yaml), we have a persistent Dask cluster running inside GKE. Our feature engineering Python script can now connect to it.

Here is a simplified but representative feature engineering script. It reads raw data, performs a non-trivial groupby aggregation, and writes the results as a partitioned Parquet dataset back to GCS.

src/features/compute_features.py

import os
import logging
import yaml
import dask.dataframe as dd
from dask.distributed import Client, progress
import gcsfs

# --- Configuration & Logging ---
# A real-world project would use a more robust logging setup.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


def get_gcs_filesystem():
    """Initializes and returns a GCSFileSystem object.
    
    In a GCP environment with Application Default Credentials (ADC) configured,
    gcsfs will automatically pick them up. This is the recommended practice.
    """
    try:
        fs = gcsfs.GCSFileSystem()
        return fs
    except Exception as e:
        logger.error(f"Failed to initialize GCSFileSystem: {e}")
        raise


def compute_user_aggregates(dask_client: Client, input_path: str, params: dict) -> dd.DataFrame:
    """
    Computes user-level transaction aggregations using Dask.

    Args:
        dask_client: An active Dask distributed client.
        input_path: GCS path to the raw transactions CSV file.
        params: Dictionary of parameters, e.g., for window sizes.

    Returns:
        A Dask DataFrame with the computed features.
    """
    logger.info(f"Starting feature computation from path: {input_path}")
    logger.info(f"Using parameters: {params}")

    # For this example, we'll use a rolling window parameter.
    window_size = params.get('rolling_window_days', 30)
    
    try:
        # Dask can read directly from GCS.
        # Assume CSV has columns: user_id, transaction_date, amount
        # Specify blocksize for Dask to chunk the file into partitions.
        # A common mistake is not specifying this, leading to poor parallelism.
        ddf = dd.read_csv(
            f"gcs://{input_path}",
            parse_dates=['transaction_date'],
            blocksize="64MB",
            storage_options={'token': 'cloud'} # Use gcsfs default auth
        )
        
        # Ensure the dataframe is sorted by date for rolling calculations.
        # This triggers a shuffle, which is expensive but necessary.
        ddf = ddf.set_index('transaction_date').persist()
        logger.info("Data loaded and indexed by date.")

        # --- Feature Engineering Logic ---
        # 1. Rolling sum of transaction amount over a specified window.
        rolling_sum = ddf.groupby('user_id')['amount'].rolling(f'{window_size}D').sum().reset_index()
        rolling_sum = rolling_sum.rename(columns={'amount': f'amount_sum_{window_size}d'})

        # 2. Total transaction count per user.
        trx_count = ddf.groupby('user_id')['amount'].count().reset_index()
        trx_count = trx_count.rename(columns={'amount': 'total_transaction_count'})
        
        # --- Join Features Back Together ---
        # The pitfall here is managing indexes. Dask joins are most efficient on indexed columns.
        final_features = dd.merge(
            rolling_sum, 
            trx_count, 
            on='user_id'
        ).set_index('user_id')

        logger.info("Feature computation graph defined.")
        return final_features

    except Exception as e:
        logger.error(f"An error occurred during Dask computation: {e}", exc_info=True)
        raise


def main():
    """
    Main execution script. Connects to Dask, runs computation, saves results.
    """
    DASK_SCHEDULER_ADDRESS = os.getenv("DASK_SCHEDULER_ADDRESS", "tcp://feature-gen-cluster-scheduler.default.svc.cluster.local:8786")
    
    logger.info(f"Connecting to Dask scheduler at {DASK_SCHEDULER_ADDRESS}")
    try:
        client = Client(DASK_SCHEDULER_ADDRESS)
    except Exception as e:
        logger.error(f"Could not connect to Dask scheduler: {e}")
        return

    logger.info(f"Dask client connected: {client}")

    # Load parameters from dvc.yaml (injected via the pipeline)
    with open("params.yaml", 'r') as f:
        params = yaml.safe_load()

    # Define I/O paths (these will be managed by DVC)
    input_data_path = "mlops-feature-store-dvc/data/raw/transactions.csv" # A simplified path
    output_feature_path = "gcs://mlops-feature-store-dvc/data/features/user_aggregates"

    # Execute computation
    features_ddf = compute_user_aggregates(client, input_data_path, params['features'])
    
    logger.info(f"Computation graph has {len(features_ddf.dask)} tasks.")
    
    # --- Writing Output ---
    # Writing partitioned Parquet is the standard for analytic datasets.
    # This triggers the actual Dask computation.
    logger.info(f"Writing features to {output_feature_path}")
    future = features_ddf.to_parquet(
        output_feature_path,
        engine='pyarrow',
        write_index=True,
        overwrite=True,
        compute=False # Return a future to monitor progress
    )
    
    # Await and monitor computation
    progress(future)
    result = future.compute()
    
    logger.info("Feature computation and writing complete.")
    client.close()

if __name__ == "__main__":
    main()

Phase 3: Orchestrating the Pipeline with a dvc.yaml File

Now we connect the pieces. The dvc.yaml file defines our Directed Acyclic Graph (DAG) of computation. It declares the dependencies (raw data), the outputs (computed features), and the command to run.

We also introduce a params.yaml for versioning hyperparameters and configurations.

params.yaml

features:
  rolling_window_days: 30

dvc.yaml

stages:
  compute_user_features:
    cmd: python src/features/compute_features.py
    deps:
      - src/features/compute_features.py
      - data/raw # Dependency on the raw data tracked by DVC
    params:
      - features.rolling_window_days # Dependency on our parameter
    outs:
      - data/features/user_aggregates: # The output directory
          cache: false # We don't want DVC to copy GCS data to local cache
          remote: gcs_remote

A critical detail is cache: false. Since our Dask script writes directly to GCS, we want DVC to track the metadata of the output directory on GCS, not download it and re-upload it. This is a common point of confusion and inefficiency.

To run the pipeline, we use dvc repro.

# Run the pipeline defined in dvc.yaml
dvc repro

# After it completes, the output location is now tracked.
# Commit the results to Git (which updates dvc.lock with output hashes)
git add dvc.yaml dvc.lock params.yaml
git commit -m "feat: Add user aggregate feature computation stage"

# This doesn't push data, just the lock file representing the new state.
git push

We now have a fully reproducible feature engineering pipeline. Changing params.yaml or the source code and running dvc repro will intelligently re-run the computation. A git tag, v1.0-features, now represents an immutable snapshot of code, data, and parameters.

Phase 4: Serving Features via a GraphQL API on GKE

The final piece is providing flexible access to these versioned features. The GraphQL server will run as another service on GKE. Its primary job is to resolve queries by finding the correct version of the feature data in GCS and loading it.

We use Strawberry for building the GraphQL schema and resolvers in Python.

src/api/schema.py

import strawberry
import typing
import datetime
import pandas as pd
import dask.dataframe as dd
from dvc.api import get_url

# A dummy User type for demonstration
@strawberry.type
class UserFeatures:
    user_id: str
    transaction_date: datetime.datetime
    amount_sum_30d: float
    total_transaction_count: int

def resolve_features(
    user_ids: typing.List[str], 
    data_version: str = "main" # Git commit/tag to fetch data from
) -> typing.List[UserFeatures]:
    """
    Resolver logic to fetch versioned features.

    This is the core of the service. It uses the DVC API to find the
    GCS path for a given Git tag, then reads the data with Dask.
    """
    try:
        # Use DVC's Python API to get the GCS URL for the feature data
        # at a specific Git revision (`rev`).
        # This is a powerful pattern for decoupling the API from the data location.
        feature_path = get_url(
            path='data/features/user_aggregates',
            repo='.', # Assumes running from the repo root
            rev=data_version
        )
        
        # In a real-world scenario, you might have a mapping of logical versions
        # (e.g., "production-v2.1") to Git tags.
        print(f"Resolved feature path for version '{data_version}': {feature_path}")

        # Use Dask to read the partitioned Parquet data from GCS.
        # Dask is excellent here because it can perform a filtered read,
        # only loading data for the requested user_ids if the data is
        # partitioned or indexed by user_id.
        ddf = dd.read_parquet(
            feature_path,
            engine='pyarrow',
            filters=[('user_id', 'in', user_ids)] # Efficiently filter rows
        )
        
        # Trigger computation and convert to a Pandas DataFrame.
        # For a small number of user_ids, this is fast.
        # For very large requests, this could be a bottleneck.
        pdf = ddf.compute()
        pdf = pdf.reset_index()

        # Convert the DataFrame rows to our GraphQL UserFeatures type.
        # This data shaping logic is non-trivial in a real application.
        results = []
        for _, row in pdf.iterrows():
            results.append(
                UserFeatures(
                    user_id=row['user_id'],
                    transaction_date=row['transaction_date'],
                    amount_sum_30d=row['amount_sum_30d'],
                    total_transaction_count=row['total_transaction_count']
                )
            )
        return results

    except Exception as e:
        # Proper error handling is critical.
        print(f"Error resolving features: {e}")
        # In production, log this error and return a meaningful GraphQL error.
        raise Exception("Failed to retrieve features. Please check data version and user IDs.")


@strawberry.type
class Query:
    @strawberry.field
    def get_user_features(
        self, 
        user_ids: typing.List[str], 
        version: str
    ) -> typing.List[UserFeatures]:
        """
        Fetches a batch of user features for a specified data version.
        The version corresponds to a Git tag/commit in the DVC repository.
        """
        return resolve_features(user_ids=user_ids, data_version=version)

schema = strawberry.Schema(query=Query)

This resolver demonstrates the synergy: GraphQL provides the query interface, and the DVC API dynamically finds the data location for the requested version.

The service is containerized and deployed to GKE.

Dockerfile

FROM python:3.9-slim

WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy the application source code
COPY src/ /app/src/
COPY .dvc/ /app/.dvc/ # DVC needs its config to resolve paths
COPY dvc.yaml /app/
COPY dvc.lock /app/

# The API needs a Git history to resolve revisions.
# In a real CI/CD pipeline, you would ensure the .git directory is present.
# For simplicity here, we assume a shallow clone would be made available.
# RUN git init && git remote add origin <your-repo-url> && git fetch --depth 1

# Expose the port
EXPOSE 8000

# Run the GraphQL server
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]

A major challenge in productionizing this service is authentication from within the GKE pod to GCS. The best practice is to use Workload Identity, which associates a Kubernetes Service Account with a GCP IAM Service Account. This avoids managing and mounting service account key files entirely.

graph TD
    subgraph "Local Development / CI"
        A[Data Scientist] -- git commit --> B(Git Repository)
        A -- dvc push --> C(GCS Raw Data)
        D[CI/CD Runner] -- dvc repro --> E{GKE Dask Cluster}
        E -- reads from --> C
        E -- writes to --> F(GCS Feature Store)
        D -- git tag v1.0 --> B
    end

    subgraph "Production Serving on GKE"
        G[ML Model Service] -- GraphQL Query --> H(GraphQL API Service)
        H -- get_url(rev='v1.0') --> B
        H -- Reads Parquet --> F
        H -- Features --> G
    end

    B -- DVC Metadata --> H
    C -- DVC Tracked --> B
    F -- DVC Tracked --> B

Lingering Issues and Future Iterations

This architecture, while powerful, is not without its limitations. The GraphQL resolver currently performs a blocking read from GCS. For very large feature requests, this introduces significant latency. A future iteration should explore asynchronous resolvers and potentially streaming responses. The current implementation only fetches pre-computed features; a true on-demand system would require a more complex architecture where the GraphQL API can enqueue a Dask computation via a job queue like Cloud Tasks or RabbitMQ and notify the client upon completion. This adds a layer of state management that we consciously deferred.

Furthermore, the feature discovery process is still manual. Data scientists must know the names of the features and the available Git tags. The next logical step is to integrate a metadata catalog. As the Dask cluster workload becomes more dynamic, implementing the Horizontal Pod Autoscaler for Dask workers, based on metrics like task queue length from the scheduler, will be crucial for cost optimization. The current setup is a robust foundation, but it is just that—a foundation for a more comprehensive feature store platform.


  TOC