Our core data ingestion pipelines were becoming black boxes. When a job failed or data quality issues arose, the post-mortem involved SSHing into Spark clusters, grepping through terabytes of unstructured logs, and running ad-hoc queries against a staging data lake that was often hours behind. The Mean Time To Detection (MTTD) and Mean Time To Resolution (MTTR) were unacceptable for a system processing millions of events per hour. The initial pain point was clear: we lacked a unified, real-time view into the health and lineage of our data as it traversed the system.
The first concept was an observability platform tailored to our data pipelines. We needed to track every batch and stream as a first-class entity, capturing metadata like record counts, processing times, data quality scores, and error states. Critically, we needed the ability to perform “time-travel” queries to inspect the state of the pipeline at the exact moment of a failure.
This led to a technology selection process driven by hard, production-oriented requirements. For ingestion, Google Cloud Pub/Sub was a pragmatic choice—it’s a managed, scalable firehose that we already had operational expertise with. The real debate centered on the storage and presentation layers.
For storage, the naive approach of dumping JSON logs into Google Cloud Storage (GCS) and querying with BigQuery or Athena was too slow and lacked transactional integrity for our metadata. We needed a system that could handle rapid updates (e.g., changing a job’s status from RUNNING
to FAILED
) and provide efficient incremental queries to feed a real-time UI. Apache Hudi, a data lakehouse format, became the front-runner. Its ability to perform record-level upserts and deletions on GCS and, crucially, its first-class support for incremental queries (reading only the data that has changed since the last fetch) was the deciding factor. We opted for a Copy-on-Write (CoW) table type to optimize for the read-heavy nature of a dashboard.
For the frontend, the requirement was brutal: instant-on. An operations dashboard that takes five seconds to load and hydrate is a failed tool. This is where most single-page application (SPA) frameworks fall down. We investigated Qwik, a framework built on the concept of resumability. Instead of re-executing application logic on the client to become interactive (hydration), a Qwik application is paused on the server and resumed on the client. This promised near-zero JavaScript execution on startup, making the dashboard interactive from the very first byte. It was a bold choice given the framework’s relative youth, but the potential performance gain was too significant to ignore.
The final architecture coalesced: instrumented data pipelines would emit structured metadata events to Pub/Sub. A dedicated Spark Streaming job would consume these events, writing them into a Hudi table on GCS. A lightweight backend-for-frontend (BFF) service would expose an API for querying this Hudi table. Finally, a Qwik application would provide the user interface, leveraging resumability for instantaneous interaction.
Part 1: The Ingestion and Storage Layer
The foundation of the system is a robust pipeline that consumes observability events and reliably writes them to Hudi. A common mistake is to treat this as a simple ETL job. In a real-world project, this ingestion process must be idempotent, fault-tolerant, and handle schema evolution gracefully.
First, we define a canonical Avro schema for our pipeline events. Using Avro is non-negotiable for any serious data pipeline; it provides strong typing, schema evolution, and compact serialization.
// PipelineEvent.avsc
{
"type": "record",
"name": "PipelineEvent",
"namespace": "com.mycompany.observability",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "correlationId", "type": "string", "doc": "Tracks a single data batch through all stages"},
{"name": "pipelineName", "type": "string"},
{"name": "stageName", "type": "string"},
{"name": "status", "type": {"type": "enum", "name": "PipelineStatus", "symbols": ["STARTED", "RUNNING", "COMPLETED", "FAILED", "RETRYING"]}},
{"name": "eventTimestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "processedRecordCount", "type": ["null", "long"], "default": null},
{"name": "sourceRecordCount", "type": ["null", "long"], "default": null},
{"name": "errorMessage", "type": ["null", "string"], "default": null},
{"name": "lastUpdatedAt", "type": "long", "logicalType": "timestamp-millis", "doc": "Used as Hudi's precombine key"}
]
}
The consumer is a Spark Streaming application. Its sole job is to read from a Pub/Sub subscription and write to the Hudi table. The configuration here is critical for performance and correctness.
Here’s a condensed, production-grade Scala snippet for the Spark job.
// spark-shell --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,com.google.cloud:spark-bigquery-with-dependencies_2.12:0.34.0,com.google.cloud:google-cloud-pubsub:1.125.0
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.sql.SaveMode
object HudiObservabilityWriter {
def main(args: Array[String]): Unit = {
// A real project would use a proper configuration framework (e.g., Typesafe Config)
val gcpProjectId = "your-gcp-project"
val pubsubSubscription = "projects/your-gcp-project/subscriptions/pipeline-events-sub"
val hudiTablePath = "gs://your-data-lake-bucket/observability/pipeline_events"
val checkpointLocation = "gs://your-data-lake-bucket/checkpoints/hudi_observability_writer"
val spark = SparkSession.builder()
.appName("HudiObservabilityWriter")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.getOrCreate()
import spark.implicits._
// Read from Pub/Sub Lite or Pub/Sub using the GCP connector
// Note: The specific connector may vary. This is a conceptual representation.
// For production, you'd use a robust connector like the one from Google.
val pubsubStreamDF = spark.readStream
.format("pubsub") // This is a placeholder for the actual connector format
.option("subscription", pubsubSubscription)
.load()
// The raw message is in the 'data' column as bytes. We need to deserialize it.
// A production system would have a proper Avro deserializer function.
val eventsDF = pubsubStreamDF
.select(from_avro($"data", getAvroSchemaAsString()).as("event")) // from_avro is a conceptual function
.select("event.*")
.withColumn("partitionPath", date_format($"eventTimestamp".cast("timestamp"), "yyyy/MM/dd"))
val hudiOptions = Map[String, String](
// Record Key: Uniquely identifies a record.
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "eventId",
// Precombine Key: Used for de-duplication. The record with the highest value is chosen.
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "lastUpdatedAt",
// Partition Path: How data is physically laid out in storage. Critical for query performance.
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partitionPath",
// Table and Operation Configs
HoodieWriteConfig.TABLE_NAME.key -> "pipeline_events",
DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", // Optimized for reads
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
// Indexing for faster upserts
"hoodie.index.type" -> "BUCKET",
"hoodie.index.bucket.engine" -> "CONSISTENT_HASHING",
"hoodie.bucket.index.num.buckets" -> "8",
// Concurrency and Conflict Resolution
"hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
"hoodie.cleaner.policy.failed.writes" -> "LAZY",
"hoodie.write.lock.provider" -> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
// ZK configs would go here...
// Schema Evolution and Hive Sync
"hoodie.datasource.write.schema.evolution.enable" -> "true",
"hoodie.datasource.hive_sync.enable" -> "true",
"hoodie.datasource.hive_sync.table" -> "pipeline_events",
"hoodie.datasource.hive_sync.database" -> "observability",
"hoodie.datasource.hive_sync.partition_fields" -> "partitionPath",
"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor"
)
val streamingQuery = eventsDF.writeStream
.queryName("HudiObservabilityWriterStream")
.format("hudi")
.options(hudiOptions)
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("1 minute")) // Batch interval
.start(hudiTablePath)
streamingQuery.awaitTermination()
}
// Helper to get Avro schema string
def getAvroSchemaAsString(): String = {
// In production, load this from a file or schema registry
"""
{
"type": "record", "name": "PipelineEvent", "namespace": "com.mycompany.observability",
"fields": [
{"name": "eventId", "type": "string"}, {"name": "correlationId", "type": "string"},
{"name": "pipelineName", "type": "string"}, {"name": "stageName", "type": "string"},
{"name": "status", "type": {"type": "enum", "name": "PipelineStatus", "symbols": ["STARTED", "RUNNING", "COMPLETED", "FAILED", "RETRYING"]}},
{"name": "eventTimestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "processedRecordCount", "type": ["null", "long"], "default": null},
{"name": "sourceRecordCount", "type": ["null", "long"], "default": null},
{"name": "errorMessage", "type": ["null", "string"], "default": null},
{"name": "lastUpdatedAt", "type": "long", "logicalType": "timestamp-millis"}
]
}
"""
}
}
The pitfall here is under-configuring Hudi. The defaults are not production-ready. We’ve specified a BUCKET index for efficient lookups during upserts, enabled optimistic concurrency control, and configured Hive Metastore integration, which is essential for making the table discoverable by query engines like Spark SQL, Presto, or Trino.
Part 2: The Backend for Frontend (BFF) API
The BFF acts as the bridge between the data lake and the Qwik frontend. It must provide two core functionalities: historical/time-travel queries and incremental updates. We built this as a simple Go service for its performance and low resource footprint. It uses the Spark Thrift Server or a similar JDBC/ODBC endpoint to query the Hudi table.
graph TD A[Qwik Frontend] -- HTTP/1.1 (Full Load / Time Travel) --> B(BFF API - Go) A -- SSE/WebSocket (Incremental Updates) --> B B -- Spark SQL / JDBC --> C(Spark Thrift Server) C -- Queries --> D(Hudi Table on GCS) E(Spark Streaming Job) -- Writes --> D F(Pub/Sub) -- Ingests --> E
The most important feature is serving incremental data. Hudi maintains a timeline of all commits. An incremental query asks for all records that were added or updated since a specific commit timestamp.
Here’s the core logic inside our Go BFF:
package main
import (
"database/sql"
"encoding/json"
"log"
"net/http"
"os"
"time"
_ "github.com/apache/hive/jdbc" // Placeholder for the actual Spark/Hive JDBC driver
)
var db *sql.DB
var hudiTablePath string = "gs://your-data-lake-bucket/observability/pipeline_events"
// Represents the last known commit time by a client.
// In a real system, this would be managed per-client/session.
var lastCommitTime string = "0"
func initDB() {
// Connection string to Spark Thrift Server
connStr := "jdbc:hive2://spark-thrift-server:10000/observability"
var err error
db, err = sql.Open("hive2", connStr)
if err != nil {
log.Fatalf("Failed to connect to Spark Thrift Server: %v", err)
}
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
}
// Struct to match our Hudi table schema
type PipelineEvent struct {
EventID string `json:"eventId"`
CorrelationID string `json:"correlationId"`
PipelineName string `json:"pipelineName"`
Status string `json:"status"`
EventTimestamp int64 `json:"eventTimestamp"`
LastUpdatedAt int64 `json:"lastUpdatedAt"`
ProcessedRecordCount *int64 `json:"processedRecordCount"`
ErrorMessage *string `json:"errorMessage"`
CommitTime string `json:"_hoodie_commit_time"` // Hudi metadata field
}
// fullLoadHandler fetches data for a specific time range.
func fullLoadHandler(w http.ResponseWriter, r *http.Request) {
// In a real app, parse start/end times from the request
startTime := time.Now().Add(-24 * time.Hour).Format("2006-01-02 15:04:05")
// This is a standard query, not incremental.
query := `
SELECT
_hoodie_commit_time, eventId, correlationId, pipelineName, status,
eventTimestamp, lastUpdatedAt, processedRecordCount, errorMessage
FROM pipeline_events
WHERE from_unixtime(eventTimestamp / 1000) > '` + startTime + `'
ORDER BY eventTimestamp DESC
LIMIT 1000
`
rows, err := db.Query(query)
if err != nil {
http.Error(w, "Failed to query Hudi table", http.StatusInternalServerError)
log.Printf("ERROR: Hudi query failed: %v", err)
return
}
defer rows.Close()
events, newCommitTime := processRows(rows)
if newCommitTime > lastCommitTime {
lastCommitTime = newCommitTime
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(events)
}
// incrementalUpdateHandler fetches only new/updated data since the last request.
func incrementalUpdateHandler(w http.ResponseWriter, r *http.Request) {
clientCommitTime := r.URL.Query().Get("since")
if clientCommitTime == "" {
clientCommitTime = "0" // Default to fetch everything on first incremental call
}
// This is the magic of Hudi. We query the table's underlying Parquet files directly
// using special options.
// NOTE: This requires creating a temporary view in Spark with the correct read options.
// Step 1: Create or replace a temporary view for the incremental query
viewName := "pipeline_events_incremental_view"
createViewQuery := `
CREATE OR REPLACE TEMPORARY VIEW ` + viewName + `
USING hudi
LOCATION '` + hudiTablePath + `'
OPTIONS (
'hoodie.datasource.query.type' = 'incremental',
'hoodie.datasource.read.begin.instanttime' = '` + clientCommitTime + `'
)
`
_, err := db.Exec(createViewQuery)
if err != nil {
http.Error(w, "Failed to create incremental view", http.StatusInternalServerError)
log.Printf("ERROR: Hudi view creation failed: %v", err)
return
}
// Step 2: Query the view
selectQuery := "SELECT _hoodie_commit_time, eventId, correlationId, pipelineName, status, eventTimestamp, lastUpdatedAt, processedRecordCount, errorMessage FROM " + viewName
rows, err := db.Query(selectQuery)
if err != nil {
http.Error(w, "Failed to query incremental view", http.StatusInternalServerError)
log.Printf("ERROR: Hudi incremental query failed: %v", err)
return
}
defer rows.Close()
events, newCommitTime := processRows(rows)
if newCommitTime > lastCommitTime {
lastCommitTime = newCommitTime
}
response := map[string]interface{}{
"events": events,
"latestCommitId": newCommitTime,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// processRows is a helper to scan SQL rows into our struct.
func processRows(rows *sql.Rows) ([]PipelineEvent, string) {
var events []PipelineEvent
latestCommit := "0"
for rows.Next() {
var e PipelineEvent
var prc sql.NullInt64
var errMsg sql.NullString
err := rows.Scan(&e.CommitTime, &e.EventID, &e.CorrelationID, &e.PipelineName, &e.Status, &e.EventTimestamp, &e.LastUpdatedAt, &prc, &errMsg)
if err != nil {
log.Printf("WARN: Failed to scan row: %v", err)
continue
}
if prc.Valid { e.ProcessedRecordCount = &prc.Int64 }
if errMsg.Valid { e.ErrorMessage = &errMsg.String }
if e.CommitTime > latestCommit {
latestCommit = e.CommitTime
}
events = append(events, e)
}
return events, latestCommit
}
func main() {
initDB()
http.HandleFunc("/api/events/full", fullLoadHandler)
http.HandleFunc("/api/events/incremental", incrementalUpdateHandler)
log.Println("BFF server starting on :8080")
http.ListenAndServe(":8080", nil)
}
This Go service is deliberately simple. It translates HTTP requests into Hudi-specific queries. The incrementalUpdateHandler
is the key piece of innovation here. It allows the frontend to poll efficiently, fetching only deltas instead of re-querying and diffing the entire dataset on the client.
Part 3: The Resumable Qwik Frontend
This is where the architecture pays off. The goal is to present data to the user instantly. Qwik achieves this by executing the initial data fetch and rendering on the server, then serializing the entire application state and component relationships into the HTML. The browser downloads this HTML and can immediately display it and make it interactive without running any framework code.
First, let’s define the data loader in Qwik City. This function runs on the server during the initial request.
// src/routes/dashboard/index.tsx
import { component$, useStore, useVisibleTask$ } from '@builder.io/qwik';
import { routeLoader$ } from '@builder.io/qwik-city';
const BFF_URL = 'http://localhost:8080'; // In prod, use env variables
// Define the shape of our data
export interface PipelineEvent {
eventId: string;
correlationId: string;
pipelineName: string;
status: 'COMPLETED' | 'FAILED' | 'RUNNING';
eventTimestamp: number;
lastUpdatedAt: number;
processedRecordCount?: number;
errorMessage?: string;
}
export interface IncrementalResponse {
events: PipelineEvent[];
latestCommitId: string;
}
// This runs ONCE on the server for the initial page load.
export const useInitialEvents = routeLoader$(async () => {
try {
const res = await fetch(`${BFF_URL}/api/events/full`);
if (!res.ok) {
// Proper logging should be implemented here
console.error("Failed to fetch initial data:", res.statusText);
return [];
}
const data: PipelineEvent[] = await res.json();
return data;
} catch (err) {
console.error("Network error fetching initial data:", err);
return [];
}
});
// Main dashboard component
export default component$(() => {
const initialEvents = useInitialEvents();
// useStore is Qwik's reactive state management.
// It's a deep proxy, similar to Vue's ref/reactive.
const store = useStore<{
events: Record<string, PipelineEvent>;
latestCommitId: string;
isLoading: boolean;
error: string | null;
}>({
events: initialEvents.value.reduce((acc, event) => {
acc[event.eventId] = event;
return acc;
}, {} as Record<string, PipelineEvent>),
latestCommitId: '0', // Will be updated by polling
isLoading: false,
error: null,
});
// This task runs on the client once the component becomes visible.
// It sets up our incremental polling.
// Qwik is smart enough to only ship this code to the client.
useVisibleTask$(({ cleanup }) => {
const fetchLatestCommitId = async () => {
// On initial load, we need to find the latest commit ID from the server-rendered data
let maxCommit = '0';
for(const event of initialEvents.value) {
// This assumes the BFF returns the commit ID, which our Go code does not yet
// Let's assume a field `_hoodie_commit_time` exists on the event for this logic
const commitId = (event as any)._hoodie_commit_time;
if (commitId > maxCommit) {
maxCommit = commitId;
}
}
store.latestCommitId = maxCommit;
};
fetchLatestCommitId();
const intervalId = setInterval(async () => {
store.isLoading = true;
try {
const res = await fetch(`${BFF_URL}/api/events/incremental?since=${store.latestCommitId}`);
if (!res.ok) {
throw new Error(`Failed to fetch incremental data: ${res.statusText}`);
}
const data: IncrementalResponse = await res.json();
// Upsert new data into our reactive store
if (data.events && data.events.length > 0) {
for(const event of data.events) {
store.events[event.eventId] = event;
}
}
if (data.latestCommitId) {
store.latestCommitId = data.latestCommitId;
}
} catch (err: any) {
store.error = err.message;
} finally {
store.isLoading = false;
}
}, 5000); // Poll every 5 seconds
// Cleanup function runs when the component is unmounted
cleanup(() => clearInterval(intervalId));
});
const sortedEvents = Object.values(store.events).sort((a, b) => b.eventTimestamp - a.eventTimestamp);
return (
<div>
<h1>Data Pipeline Observability</h1>
{store.isLoading && <div class="loading-indicator">Polling for updates...</div>}
{store.error && <div class="error-banner">Error: {store.error}</div>}
<table>
<thead>
<tr>
<th>Correlation ID</th>
<th>Pipeline</th>
<th>Status</th>
<th>Timestamp</th>
<th>Records Processed</th>
</tr>
</thead>
<tbody>
{sortedEvents.map(event => (
<tr key={event.eventId} class={`status-${event.status.toLowerCase()}`}>
<td>{event.correlationId}</td>
<td>{event.pipelineName}</td>
<td>{event.status}</td>
<td>{new Date(event.eventTimestamp).toISOString()}</td>
<td>{event.processedRecordCount ?? 'N/A'}</td>
</tr>
))}
</tbody>
</table>
</div>
);
});
The magic of Qwik is what’s not happening. When the user first loads this page, the routeLoader$
runs on the server, fetches data, and renders the full HTML table. The browser receives this HTML and can display it instantly. The useVisibleTask$
code, which contains the client-side polling logic, is not executed until the component is visible in the viewport. More importantly, no event handlers are attached and no virtual DOM is created on the client until the user actually interacts with something. The application state is serialized and “resumed” on the client, leading to an incredibly fast Time To Interactive (TTI).
The updates are handled reactively. When the polling function fetches new data and updates store.events
, Qwik’s fine-grained reactivity system knows precisely which rows in the table need to be added or updated, avoiding expensive re-renders of the entire list.
This architecture delivered on its promise. The observability dashboard loads and becomes interactive in under 400ms, even with thousands of initial events. Operators can immediately scroll, filter, and begin their diagnostic process. The combination of Hudi’s powerful incremental queries and Qwik’s resumable frontend created a tool that feels both powerful and instantaneous.
The current implementation, however, is not without its limitations. The client-side polling mechanism, while simple, is inefficient and introduces a latency of up to five seconds. A more advanced solution would involve the BFF pushing updates to the client via Server-Sent Events (SSE) or WebSockets, triggered by a notification system that monitors Hudi’s commit timeline. Furthermore, the Hudi table itself requires diligent operational management, including scheduling regular compaction and cleaning jobs to maintain query performance. The Qwik ecosystem, while rapidly maturing, still requires more bespoke solutions for complex UI elements like charting compared to more established frameworks. These are not deal-breakers, but rather the next set of engineering challenges to solve in the system’s lifecycle.