The technical debt was accumulating at an alarming rate. Our data engineering team operated a series of Apache Spark jobs, processing several terabytes of event data daily and dumping aggregated results as massive CSV files into an S3 bucket. Downstream, at least four separate product teams were building their own data presentation layers. The marketing analytics team, favouring performance and SEO, used Gatsby to build static reports. The internal operations team required interactive, filterable dashboards and chose Nuxt.js for its server-side rendering capabilities. Each team wrote its own Python or Node.js scripts to parse the CSVs, leading to inconsistent metric calculations, duplicated data-shaping logic, and a brittle system where a minor schema change in the Spark output could break multiple applications without warning. The core pain point was a complete lack of a data contract between the producers and consumers.
Our initial concept was to decouple data processing from data consumption entirely. We needed to establish a single, authoritative source of truth that could serve both build-time data ingestion for static site generators and runtime data fetching for dynamic applications. The architecture had to enforce a strict schema, be scalable enough to handle queries from multiple clients, and abstract away the underlying storage details from the front-end applications. We settled on a three-tier design: a refined Spark pipeline for processing, a centralized GraphQL API layer for serving, and standardized consumption patterns for our heterogeneous Gatsby and Nuxt.js micro-frontends.
The technology selection process was driven by pragmatism. We kept Apache Spark for the heavy-lifting on raw data; its distributed nature was non-negotiable for our scale. However, we decided to augment the final stage. Instead of dumping raw CSVs, the Spark job would perform all major aggregations and then hand off a moderately sized, structured DataFrame to a final-stage process on the driver node. Here, we leverage Pandas to perform last-mile transformations and serialize the output into a more structured format. Parquet was chosen over CSV for its columnar storage, schema enforcement, and superior compression. This Spark-to-Pandas handoff allows us to use the rich, single-machine data manipulation APIs of Pandas for tasks ill-suited to Spark’s distributed model, like generating complex JSON manifests.
For the serving layer, a standalone Node.js application was the clear winner. Building data-fetching logic directly into each front-end was the cause of our problems. A central API acts as a gatekeeper. We chose Fastify for its high-performance, low-overhead design and the mercurius
plugin for a robust GraphQL implementation. GraphQL was critical because it allows diverse clients like a Gatsby site and a Nuxt dashboard to request precisely the data fields they need, preventing over-fetching and providing a strongly-typed schema that serves as our data contract.
Finally, the decision to continue supporting both Gatsby and Nuxt.js was a business reality. We couldn’t force a single framework on all teams. The new architecture had to be flexible enough to accommodate both React and Vue ecosystems and both static and dynamic rendering paradigms. The challenge was to make consuming the data equally straightforward in both worlds.
Phase 1: The Revamped Spark and Pandas Pipeline
The first step was to refactor the core data processing job. The goal was to produce self-describing, partitioned data artifacts instead of a monolithic CSV. The job now outputs partitioned Parquet files organized by date and region, along with a _manifest.json
file in the root of each daily output directory. This manifest contains metadata about the generated partitions, column schemas, and summary statistics, allowing the downstream API to discover data intelligently without scanning the entire S3 prefix.
Here is a condensed version of the PySpark script, highlighting the key changes. Error handling and Spark session management are critical in a production environment.
import sys
from datetime import datetime
import json
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType
# --- Configuration and Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# In a real project, these would come from a config file or environment variables
S3_INPUT_PATH = "s3a://company-raw-events/daily-logs/"
S3_OUTPUT_PATH = "s3a://company-data-hub/processed-data/"
APP_NAME = "DailyMetricsAggregation"
def create_spark_session():
"""Initializes and returns a Spark session."""
try:
spark = SparkSession.builder \
.appName(APP_NAME) \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
logger.info("Spark session created successfully.")
return spark
except Exception as e:
logger.error(f"Failed to create Spark session: {e}")
sys.exit(1)
def process_daily_metrics(spark, run_date_str):
"""
Main ETL logic. Reads raw data, performs aggregations, and writes
partitioned Parquet files and a manifest.
"""
input_path = f"{S3_INPUT_PATH}{run_date_str}/"
output_path = f"{S3_OUTPUT_PATH}{run_date_str}/"
logger.info(f"Starting processing for date: {run_date_str}")
logger.info(f"Reading from: {input_path}")
# Define a schema for robustness
raw_schema = StructType() \
.add("user_id", StringType(), True) \
.add("event_type", StringType(), True) \
.add("region", StringType(), True) \
.add("session_duration_sec", IntegerType(), True) \
.add("revenue_usd", DoubleType(), True)
try:
raw_df = spark.read.schema(raw_schema).json(input_path)
# Core aggregation logic
aggregated_df = raw_df.groupBy("region", "event_type") \
.agg(
{"user_id": "count", "session_duration_sec": "avg", "revenue_usd": "sum"}
) \
.withColumnRenamed("count(user_id)", "event_count") \
.withColumnRenamed("avg(session_duration_sec)", "avg_session_duration") \
.withColumnRenamed("sum(revenue_usd)", "total_revenue")
aggregated_df.persist() # Cache for multiple uses
# Write data partitioned by region. This is key for efficient reads.
logger.info(f"Writing partitioned Parquet data to: {output_path}")
aggregated_df.write.mode("overwrite").partitionBy("region").parquet(output_path)
# --- The Pandas Handoff for Manifest Generation ---
# Collect distinct regions to the driver. This should be a small dataset.
regions = [row.region for row in aggregated_df.select("region").distinct().collect()]
# Generate the manifest file
manifest = {
"run_date": run_date_str,
"generated_at_utc": datetime.utcnow().isoformat(),
"data_schema": [field.jsonValue() for field in aggregated_df.schema.fields if field.name != 'region'],
"partitions": {
"partition_key": "region",
"values": sorted(regions)
},
"summary_stats": {
"total_events": aggregated_df.select("event_count").rdd.map(lambda r: r[0]).sum()
}
}
# We use a library like `boto3` to write the manifest to S3.
# This part runs on the driver, not on Spark executors.
# NOTE: boto3 must be available on the driver node.
import boto3
s3 = boto3.client('s3')
bucket_name = S3_OUTPUT_PATH.split('/')[2]
manifest_key = f"processed-data/{run_date_str}/_manifest.json"
logger.info(f"Writing manifest to s3://{bucket_name}/{manifest_key}")
s3.put_object(
Bucket=bucket_name,
Key=manifest_key,
Body=json.dumps(manifest, indent=2),
ContentType='application/json'
)
aggregated_df.unpersist()
logger.info("Processing completed successfully.")
except Exception as e:
logger.error(f"An error occurred during data processing: {e}", exc_info=True)
raise
def main():
spark = create_spark_session()
try:
# For simplicity, we use today's date. In production, this would be
# parameterized and managed by an orchestrator like Airflow.
run_date_str = datetime.utcnow().strftime('%Y-%m-%d')
process_daily_metrics(spark, run_date_str)
finally:
logger.info("Stopping Spark session.")
spark.stop()
if __name__ == "__main__":
main()
Phase 2: The Centralized GraphQL Data API
This Node.js service acts as the intermediary. It exposes a GraphQL endpoint that understands our data structure. On startup, it can fetch the latest manifest to know what data is available. When a query arrives, it translates it into a targeted read against a specific Parquet file in S3. This prevents the API from ever needing to load the entire dataset into memory. We use parquetjs-lite
for its efficiency in reading Parquet from streams.
// server.js
import Fastify from 'fastify';
import mercurius from 'mercurius';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { ParquetReader } from 'parquetjs-lite';
import { Readable } from 'stream';
// --- Configuration ---
const PORT = process.env.PORT || 4000;
const S3_BUCKET = 'company-data-hub';
const DATA_PREFIX = 'processed-data';
const s3Client = new S3Client({ region: process.env.AWS_REGION });
const app = Fastify({ logger: true });
// --- In-memory Cache for Manifests ---
// A simple cache. In production, consider a more robust solution like Redis.
const manifestCache = new Map();
async function getManifest(date) {
if (manifestCache.has(date)) {
return manifestCache.get(date);
}
const key = `${DATA_PREFIX}/${date}/_manifest.json`;
app.log.info(`Fetching manifest: ${key}`);
try {
const command = new GetObjectCommand({ Bucket: S3_BUCKET, Key: key });
const { Body } = await s3Client.send(command);
const manifestContent = await Body.transformToString();
const manifest = JSON.parse(manifestContent);
// Cache for 1 hour
manifestCache.set(date, manifest);
setTimeout(() => manifestCache.delete(date), 3600 * 1000);
return manifest;
} catch (error) {
app.log.error(error, `Failed to fetch or parse manifest for date: ${date}`);
// This is a critical error. The caller should handle the null return.
return null;
}
}
async function getMetricsForRegion(date, region) {
const key = `${DATA_PREFIX}/${date}/region=${region}/`;
app.log.info(`Searching for Parquet files in: ${key}`);
// In a real implementation, you would list objects in the prefix to find the .parquet file
// For this example, we assume a predictable naming convention.
// This part is complex because Spark output names are like part-0000-....parquet
// A robust solution would use the ListObjectsV2Command to find the file.
// For brevity, we'll construct a plausible path.
const fileKey = `${DATA_PREFIX}/${date}/region=${region}/part-00000-some-uuid.c000.snappy.parquet`;
try {
const command = new GetObjectCommand({ Bucket: S3_BUCKET, Key: fileKey });
const { Body } = await s3Client.send(command);
// parquetjs-lite can read from a stream, which is highly memory-efficient.
const reader = await ParquetReader.openStream(Body);
const cursor = reader.getCursor();
const records = [];
let record = null;
while ((record = await cursor.next())) {
records.push(record);
}
await reader.close();
return records;
} catch (error) {
app.log.error(error, `Failed to read Parquet data for date=${date}, region=${region}`);
// It's important to return an empty array or throw, to be handled by the resolver.
return [];
}
}
const schema = `
type Query {
getAvailableDates: [String!]
getMetrics(date: String!): DailyMetrics
}
type DailyMetrics {
run_date: String!
available_regions: [String!]!
metricsForRegion(region: String!): [Metric!]
}
type Metric {
event_type: String!
event_count: Int!
avg_session_duration: Float
total_revenue: Float
}
`;
const resolvers = {
Query: {
getAvailableDates: async () => {
// In a real system, you'd list the prefixes in S3, not hardcode.
return ['2023-10-27'];
},
getMetrics: async (_, { date }) => {
const manifest = await getManifest(date);
if (!manifest) {
// Using GraphQL errors is crucial for client-side handling.
throw new mercurius.ErrorWithProps(`No data available for date ${date}.`, { code: 'DATA_NOT_FOUND' });
}
return { run_date: date, manifest };
}
},
DailyMetrics: {
// This is a nested resolver. `parent` is the object returned from getMetrics.
available_regions: (parent) => {
return parent.manifest.partitions.values;
},
metricsForRegion: async (parent, { region }) => {
const { run_date } = parent;
if (!parent.manifest.partitions.values.includes(region)) {
throw new mercurius.ErrorWithProps(`Invalid region '${region}' for date ${run_date}.`, {
code: 'INVALID_ARGUMENT',
availableRegions: parent.manifest.partitions.values
});
}
return getMetricsForRegion(run_date, region);
}
}
};
app.register(mercurius, {
schema,
resolvers,
graphiql: true, // Enable GraphiQL UI for development
});
app.listen({ port: PORT, host: '0.0.0.0' }, (err, address) => {
if (err) {
app.log.error(err);
process.exit(1);
}
app.log.info(`GraphQL API server listening at ${address}`);
});
This API now provides a stable, schema-driven interface to our processed data, completely hiding the Spark pipeline and S3 storage from consumers.
Phase 3: Gatsby Integration for Static Reports
The marketing team’s Gatsby site requires all data at build time to generate static HTML pages. We created a local Gatsby source plugin, gatsby-source-datahub
, to handle this. During the gatsby build
process, this plugin calls our new GraphQL API, fetches all the data needed for the reports, and injects it into Gatsby’s internal data layer using the sourceNodes
API.
// gatsby-node.js inside a local plugin at /plugins/gatsby-source-datahub
const { gql, GraphQLClient } = require('graphql-request');
// The endpoint for our internal GraphQL API
const API_ENDPOINT = process.env.DATA_HUB_API_URL || 'http://localhost:4000/graphql';
const client = new GraphQLClient(API_ENDPOINT);
// Gatsby's sourceNodes API is where we fetch and create data nodes
exports.sourceNodes = async ({ actions, createNodeId, createContentDigest }) => {
const { createNode } = actions;
console.log("Sourcing data from Company Data Hub...");
try {
const query = gql`
query GetLatestMetrics {
getMetrics(date: "2023-10-27") {
run_date
available_regions
}
}
`;
const initialData = await client.request(query);
const { run_date, available_regions } = initialData.getMetrics;
for (const region of available_regions) {
console.log(`Fetching metrics for region: ${region}`);
const regionQuery = gql`
query GetRegionMetrics($date: String!, $region: String!) {
getMetrics(date: $date) {
metricsForRegion(region: $region) {
event_type
event_count
total_revenue
}
}
}
`;
const regionData = await client.request(regionQuery, { date: run_date, region });
// For each metric record, we create a distinct node in Gatsby's data layer.
regionData.getMetrics.metricsForRegion.forEach(metric => {
const nodeMeta = {
id: createNodeId(`DataHubMetric-${run_date}-${region}-${metric.event_type}`),
parent: null,
children: [],
internal: {
type: 'DataHubMetric', // A custom node type
contentDigest: createContentDigest(metric),
},
};
const node = {
...metric,
region, // Add region context to the node
date: run_date,
...nodeMeta,
};
createNode(node);
});
}
console.log("Data Hub sourcing complete.");
} catch (error) {
console.error("Failed to source data from Data Hub API:", error);
// Fail the build if the data source is unavailable
process.exit(1);
}
};
With this plugin, a page component in the Gatsby site can now query this data declaratively:
// src/pages/reports/us-west-summary.js
import React from 'react';
import { graphql } from 'gatsby';
const USWestReportPage = ({ data }) => {
const metrics = data.allDataHubMetric.nodes;
return (
<div>
<h1>Daily Report: US-West</h1>
<table>
<thead>
<tr>
<th>Event Type</th>
<th>Count</th>
<th>Total Revenue</th>
</tr>
</thead>
<tbody>
{metrics.map(metric => (
<tr key={metric.event_type}>
<td>{metric.event_type}</td>
<td>{metric.event_count}</td>
<td>${metric.total_revenue.toFixed(2)}</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
export const query = graphql`
query USWestMetricsQuery {
allDataHubMetric(filter: { region: { eq: "us-west" } }) {
nodes {
event_type
event_count
total_revenue
}
}
}
`;
export default USWestReportPage;
Phase 4: Nuxt.js Integration for Dynamic Dashboards
The operations team’s Nuxt.js dashboard fetches data at runtime, allowing users to select dates and regions interactively. Nuxt 3’s composition API and data fetching composables like useAsyncData
make this straightforward. The fetch happens on the server during the initial page load (SSR) and on the client during client-side navigation.
<!-- pages/dashboard.vue -->
<template>
<div>
<h1>Operations Dashboard</h1>
<div class="controls">
<label>
Date:
<select v-model="selectedDate">
<option v-for="date in availableDates" :key="date" :value="date">
{{ date }}
</option>
</select>
</label>
<label>
Region:
<select v-model="selectedRegion">
<option v-for="region in regionsForDate" :key="region" :value="region">
{{ region }}
</option>
</select>
</label>
</div>
<div v-if="pending">Loading metrics...</div>
<div v-else-if="error">
<p>Error loading data: {{ error.message }}</p>
</div>
<div v-else-if="metrics">
<!-- A simple data table; a real dashboard would use a charting library -->
<h2>Metrics for {{ selectedRegion }} on {{ selectedDate }}</h2>
<table>
<thead>
<tr>
<th>Event Type</th>
<th>Event Count</th>
<th>Avg. Session (sec)</th>
</tr>
</thead>
<tbody>
<tr v-for="metric in metrics" :key="metric.event_type">
<td>{{ metric.event_type }}</td>
<td>{{ metric.event_count }}</td>
<td>{{ metric.avg_session_duration.toFixed(1) }}</td>
</tr>
</tbody>
</table>
</div>
</div>
</template>
<script setup>
import { ref, computed, watch } from 'vue';
import { gql } from 'graphql-request';
// In a real app, this would be in a separate API client/service file
const API_ENDPOINT = 'http://localhost:4000/graphql';
// --- State Management ---
const selectedDate = ref('2023-10-27');
const selectedRegion = ref('us-east');
// --- Data Fetching ---
// This composable fetches the list of available regions for the selected date
const { data: dateInfo, error: dateInfoError } = await useAsyncData(
'dateInfo',
() => $fetch(API_ENDPOINT, {
method: 'POST',
body: {
query: gql`
query GetDateInfo($date: String!) {
getMetrics(date: $date) {
available_regions
}
}
`,
variables: { date: selectedDate.value },
}
}),
{ watch: [selectedDate] } // Re-fetch when selectedDate changes
);
const regionsForDate = computed(() => dateInfo.value?.data?.getMetrics?.available_regions || ['us-east']);
// This is the primary data fetch for the metrics table
// The key is important for caching and re-fetching
const { data: metricData, pending, error, refresh } = await useAsyncData(
`metrics-${selectedDate.value}-${selectedRegion.value}`,
() => $fetch(API_ENDPOINT, {
method: 'POST',
body: {
query: gql`
query GetDashboardMetrics($date: String!, $region: String!) {
getMetrics(date: $date) {
metricsForRegion(region: $region) {
event_type
event_count
avg_session_duration
}
}
}
`,
variables: {
date: selectedDate.value,
region: selectedRegion.value,
},
},
})
);
const metrics = computed(() => metricData.value?.data?.getMetrics?.metricsForRegion);
// When the user changes a filter, we need to trigger a re-fetch
watch([selectedDate, selectedRegion], () => {
// Changing the key of useAsyncData implicitly triggers a refetch,
// but manual refresh gives more control if needed.
refresh();
});
// A placeholder for available dates
const availableDates = ['2023-10-27'];
</script>
Phase 5: Closing the Loop with CI/CD Orchestration
The final piece was automating the static site rebuilds. The Gatsby site would become stale if not rebuilt after the Spark job finished. We configured an S3 event notification on the output bucket to trigger an AWS Lambda function upon the creation of the _manifest.json
file. This Lambda function, in turn, calls a build hook provided by our CI/CD platform (e.g., Vercel or Netlify), initiating a fresh deployment of the Gatsby site with the new data.
graph TD subgraph AWS EMR/Glue A[Spark Job Execution] -->|Writes Manifest| B(S3 Bucket: .../_manifest.json); end subgraph AWS Eventing B -->|s3:ObjectCreated:Put| C(S3 Event Notification); C -->|Invokes| D[AWS Lambda Function]; end subgraph CI/CD Platform D -->|POST Request| E(Deployment Build Hook); E -->|Triggers| F[Gatsby Build Process]; end subgraph Build & Deploy F -->|gatsby-source-datahub| G[GraphQL Data API]; G -->|Reads Parquet from| B; F -->|Generates Static Site| H(New Site Version); E -->|Deploys| H; end
This event-driven workflow ensures that our public-facing static reports are updated automatically within minutes of the data pipeline’s completion, without any manual intervention.
The final architecture provides a robust, maintainable, and scalable system. The data engineering team can now evolve the Spark pipeline, and as long as the GraphQL schema contract is respected, the front-end applications remain unaffected. The front-end teams can independently build rich user experiences, confident in the consistency and availability of the data they consume.
The current system is not without its limitations. The build time for the Gatsby application is directly proportional to the volume of data it needs to source. As we add more reports and historical data, we will need to explore Gatsby’s incremental build capabilities or more sophisticated data-sourcing strategies to keep build times manageable. The GraphQL API, while stateless, is a potential single point of failure; a production deployment requires high-availability measures like running multiple instances behind a load balancer. Furthermore, this architecture is designed for daily batch updates. For the Nuxt dashboards that may require near real-time data, the next logical iteration would involve integrating a streaming pipeline (e.g., using Spark Streaming or Flink) and pushing updates to the client via WebSockets, a challenge that this batch-oriented foundation is not designed to solve.