Architecting a Data Lakehouse Meta-Observability Plane with Apache Iceberg, Prometheus, and a Svelte Headless UI


The operational visibility of our large-scale data lakehouse, processing petabytes daily, has become a critical blind spot. Standard Prometheus metrics tracking job execution times, CPU utilization, and memory consumption are no longer sufficient. We are increasingly facing performance degradation and data quality issues originating not from infrastructure, but from the structural evolution of the data itself. Problems like severe data skew in new partitions, the proliferation of small files after a backfill, or sudden schema changes are invisible to our existing time-series monitoring. These are “metadata” problems, and querying them requires a fundamentally different approach than a TSDB can offer. The core challenge is to build a unified observability plane that can correlate high-level operational metrics from Prometheus with deep, historical, and structural metadata from the lakehouse tables themselves.

This requires storing and analyzing the evolutionary history of our data assets. The metadata—snapshots, manifests, partition statistics, schema versions, file counts—is structured, relational, and requires analytical querying, making it a poor fit for Prometheus’s high-cardinality-averse, key-value model. The logical solution is to persist this analytical metadata back into the lakehouse itself, creating a meta-observability layer. The primary architectural decision, therefore, becomes selecting the optimal table format for these critical metadata tables: Apache Iceberg or Delta Lake.

graph TD
    subgraph Data Lakehouse
        A[Production Tables - Parquet] -->|Batch/Stream Jobs| B(Table State);
    end

    subgraph Meta-Observability Plane
        C[Metadata Extractor Service] -- Reads table history --> B;
        C -- Writes structural metadata --> D{Choice: Iceberg vs. Delta};
        D -- Persists metadata as data --> E[Meta-Analytics Table];
    end

    subgraph Visualization & Alerting
        F[Prometheus] -- Scrapes job metrics --> G[Application Metrics Endpoint];
        H[Trino/Spark SQL] -- Queries historical metadata --> E;
        I[Svelte Headless UI] -- Queries --> F;
        I -- Queries --> H;
    end

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px

Solution A: Delta Lake for Metadata Persistence

Delta Lake’s tight integration with Apache Spark and its transaction log-centric design present an immediate, seemingly straightforward option. The transaction log (_delta_log) is a chronologically ordered record of every change, making it a natural source for extracting historical metadata.

Pros of Delta Lake:

  1. ACID Guarantees: Writing metadata from multiple extractor jobs is simplified by Delta’s transactional integrity.
  2. Mature Spark Ecosystem: Our existing Spark infrastructure can be leveraged directly for both writing and querying the metadata tables.
  3. Time Travel: The VERSION AS OF syntax offers a simple API for querying the state of metadata at a specific point in time, which is useful for comparative analysis.

Implementation Sketch: Metadata Extraction to Delta

A Spark job could be designed to read the transaction log of a production table, parse the JSON entries, and write them into a structured Delta table.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import io.delta.tables._
import java.time.Instant

// A simplified representation of metadata we want to capture
case class TableMetadataRecord(
  tableName: String,
  timestamp: java.sql.Timestamp,
  version: Long,
  operation: String,
  operationParameters: Map[String, String],
  fileCount: Long,
  totalSizeBytes: Long,
  partitionStats: String // JSON string of partition-level stats
)

object DeltaMetadataExtractor {

  def run(spark: SparkSession, sourceTablePath: String, metaTablePath: String, tableName: String): Unit = {
    // In a real-world project, this would be managed more robustly.
    // We'd track the last processed version to avoid redundant work.
    val deltaLog = DeltaLog.forTable(spark, sourceTablePath)
    val history = deltaLog.history.getHistory() // This can be memory intensive for old tables

    import spark.implicits._

    val metadataDF = history
      .filter($"version".isNotNull) // Ensure we only process committed versions
      .as[DeltaHistory] // A case class mapping to the history schema
      .flatMap(commit => {
        try {
          // This is a simplified approach. In production, parsing `operationMetrics`
          // and calculating partition stats would be more involved.
          val numFiles = commit.operationMetrics.get("numOutputRows").map(_.toLong).getOrElse(0L)
          val sizeBytes = commit.operationMetrics.get("numOutputBytes").map(_.toLong).getOrElse(0L)
          
          Some(TableMetadataRecord(
            tableName = tableName,
            timestamp = commit.timestamp,
            version = commit.version,
            operation = commit.operation,
            operationParameters = commit.operationParameters,
            fileCount = numFiles,
            totalSizeBytes = sizeBytes,
            partitionStats = "{}" // Placeholder for complex partition analysis
          ))
        } catch {
          case e: Exception =>
            // Critical logging for production
            println(s"Failed to process commit version ${commit.version}. Error: ${e.getMessage}")
            None
        }
      })
      .toDF()

    if (!DeltaTable.isDeltaTable(spark, metaTablePath)) {
      println(s"Creating new meta-table at $metaTablePath")
      metadataDF.write.format("delta").mode("overwrite").save(metaTablePath)
    } else {
      println(s"Merging new metadata into $metaTablePath")
      val metaTable = DeltaTable.forPath(spark, metaTablePath)
      metaTable.alias("target")
        .merge(
          metadataDF.alias("source"),
          "target.tableName = source.tableName AND target.version = source.version"
        )
        .whenNotMatched()
        .insertAll()
        .execute()
    }
  }

  // Represents the schema of the Delta history DataFrame
  case class DeltaHistory(
    version: Long,
    timestamp: java.sql.Timestamp,
    operation: String,
    operationParameters: Map[String, String],
    operationMetrics: Map[String, String]
  )
}

Cons and Pitfalls of Delta Lake:

  1. Query Engine Coupling: While improving, Delta Lake’s ecosystem is still heavily centered around Spark. Efficiently querying this metadata table from other engines like Trino or Presto, which our BI and UI teams prefer, can introduce performance or compatibility friction.
  2. Log-based Query Performance: Querying the history of a Delta table requires Spark to sequentially read and process the JSON log files. For tables with millions of commits, this can become a performance bottleneck for the ad-hoc analytical queries our observability platform requires. The metadata itself is not stored in an optimal format for analytical queries until it’s transformed and written, as shown above.
  3. Metadata Overhead: The _delta_log can grow very large, and checkpointing is essential. Managing the lifecycle of these JSON log files adds operational complexity. For our goal of querying metadata as data, we are essentially paying the cost of the log format twice.

Solution B: Apache Iceberg for Metadata Persistence

Apache Iceberg’s design philosophy is different. It decouples the table format from the processing engine and relies on a hierarchical structure of metadata files (metadata JSON, manifest lists, manifest files) that point to data files. This structure is inherently more queryable.

Pros of Apache Iceberg:

  1. Engine Agnosticism: This is the killer feature for our use case. We can write the metadata using a Spark job and then provide low-latency query access via Trino for the UI, without performance penalties or complex connectors.
  2. Efficient Metadata Pruning: Iceberg’s manifest files contain statistics (min/max values, null counts) for each data file. When we query our metadata table (e.g., “find all commits that added more than 10,000 files”), the query engine can prune entire manifest files without ever touching the underlying Parquet data, leading to extremely fast queries.
  3. Direct Metadata API Access: The Iceberg Java library provides a direct, programmatic way to access table snapshots and manifests without needing a full-blown query engine. This is ideal for our metadata extractor service.

Implementation Sketch: Metadata Extraction to Iceberg

The extractor can be a lightweight Java/Scala application or a Spark job that uses the Iceberg API to inspect the source table’s state and writes to the target Iceberg meta-table.

import org.apache.iceberg.Table;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.PartitionSpec;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

// This example uses the Java API for fine-grained control, avoiding a full Spark session.
public class IcebergMetadataExtractor {

    // Define the schema for our meta-analytics table
    private static final Schema META_SCHEMA = new Schema(
        Types.NestedField.required(1, "table_name", Types.StringType.get()),
        Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
        Types.NestedField.required(3, "timestamp_ms", Types.TimestampType.withZone()),
        Types.NestedField.required(4, "operation", Types.StringType.get()),
        Types.NestedField.optional(5, "summary", Types.MapType.ofOptional(6, 7, Types.StringType.get(), Types.StringType.get())),
        Types.NestedField.required(8, "manifest_count", Types.LongType.get()),
        Types.NestedField.required(9, "total_data_files", Types.LongType.get()),
        Types.NestedField.required(10, "total_records", Types.LongType.get())
    );

    public void run(Catalog catalog, TableIdentifier sourceTableId, TableIdentifier metaTableId) throws IOException {
        Table sourceTable = catalog.loadTable(sourceTableId);
        Table metaTable = catalog.loadTable(metaTableId);

        List<GenericRecord> records = new ArrayList<>();
        long lastProcessedSnapshot = findLastProcessedSnapshot(metaTable, sourceTableId.name());

        for (Snapshot snapshot : sourceTable.snapshots()) {
            if (snapshot.snapshotId() > lastProcessedSnapshot) {
                GenericRecord record = GenericRecord.create(META_SCHEMA);
                record.setField("table_name", sourceTableId.name());
                record.setField("snapshot_id", snapshot.snapshotId());
                record.setField("timestamp_ms", Instant.ofEpochMilli(snapshot.timestampMillis()));
                record.setField("operation", snapshot.operation());
                record.setField("summary", snapshot.summary());
                record.setField("manifest_count", (long) snapshot.allManifests(sourceTable.io()).size());
                
                long fileCount = Long.parseLong(snapshot.summary().getOrDefault("total-data-files", "0"));
                long recordCount = Long.parseLong(snapshot.summary().getOrDefault("total-records", "0"));

                record.setField("total_data_files", fileCount);
                record.setField("total_records", recordCount);
                records.add(record);
            }
        }

        if (records.isEmpty()) {
            System.out.println("No new snapshots to process.");
            return;
        }

        // Write the collected records to a new Parquet file
        String dataFilePath = "s3://my-lakehouse/meta_table/data/" + System.currentTimeMillis() + ".parquet";
        OutputFile outputFile = metaTable.io().newOutputFile(dataFilePath);

        FileAppender<GenericRecord> appender = Parquet.write(outputFile)
            .schema(META_SCHEMA)
            .createWriterFunc(GenericParquetWriter::buildWriter)
            .build();
        
        try {
            appender.addAll(records);
        } finally {
            appender.close();
        }

        // Commit the new data file to the meta-table
        metaTable.newAppend()
            .appendFile(appender.dataFiles().get(0))
            .commit();
        
        System.out.println("Successfully committed " + records.size() + " new metadata records.");
    }

    private long findLastProcessedSnapshot(Table metaTable, String tableName) {
        // In a real system, we'd issue a SQL query:
        // SELECT max(snapshot_id) FROM meta_table WHERE table_name = '...'
        // For this example, we'll return a placeholder.
        return 0L;
    }
}

Cons and Pitfalls of Apache Iceberg:

  1. Catalog Dependency: Iceberg requires a central catalog (like Nessie, Glue, or a JDBC catalog) to manage table state. This is an additional architectural component to maintain, whereas Delta Lake can operate in a simpler mode using just the storage layer.
  2. Maturity Curve: While rapidly maturing, some advanced features or integrations in the Iceberg ecosystem might feel less polished than their Spark-centric Delta counterparts. A common mistake is misconfiguring the catalog, leading to inconsistent table views across different engines.

Final Choice and Rationale

The decision is to adopt Apache Iceberg for the meta-observability table.

The primary driver for this choice is the principle of decoupling for query flexibility. Our observability platform must serve multiple consumers: a real-time Svelte UI needing low-latency SQL access via Trino, data analysts running ad-hoc queries with Spark, and potentially future ML models that predict cost or performance issues. Iceberg’s design as a universal, engine-agnostic format directly supports this multi-modal access pattern without compromising performance. Delta Lake, while excellent within the Spark ecosystem, would impose an architectural constraint that funnels all access through a Spark-compatible layer, adding unnecessary latency and complexity for the UI.

Furthermore, Iceberg’s metadata structure is itself an analytical asset. The ability to directly query manifest metadata offers a path to build highly efficient diagnostics that can, for example, identify all tables that had a more than 20% increase in data files within a 5-minute commit window—a query that would be cumbersome against Delta’s JSON logs but is a natural fit for Iceberg’s manifest-level statistics. This makes Iceberg not just a container for our metadata, but an active component in its analysis.

Core Implementation: The Svelte Headless UI

The front-end must be a highly responsive and customizable dashboard, not a generic chart library. This is where Svelte combined with a headless UI library excels. We use Melt UI, a set of unstyled, accessible component builders. This gives us complete control over the look and feel while handling the complex state management and accessibility requirements.

The core challenge is fetching and correlating data from two distinct backends: the Prometheus HTTP API and our custom API gateway sitting in front of Trino (which queries the Iceberg table).

Prometheus Configuration (prometheus.yml)

A standard configuration to scrape metrics from our Spark jobs and the metadata extractor service.

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'spark_jobs'
    static_configs:
      - targets: ['spark-driver-1:4040', 'spark-driver-2:4040']
    metrics_path: /metrics/prometheus

  - job_name: 'metadata_extractor'
    static_configs:
      - targets: ['metadata-extractor-service:9091']

# Rule for alerting on sudden increases in small files, derived from our Iceberg meta-table
# This rule would be queried by a separate service that pushes a metric to Prometheus
groups:
- name: data_lake_health
  rules:
  - alert: SmallFileProliferation
    expr: increase(lakehouse_table_total_data_files{table="customer_events"}[5m]) > 1000
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High rate of new files in table {{ $labels.table }}"
      description: "Table {{ $labels.table }} has added over 1000 files in the last 5 minutes. This could indicate a small file problem."

Svelte Component for Correlated Visualization

This component fetches job duration metrics from Prometheus and table version history from our Trino API. It then displays them on a synchronized timeline, allowing an engineer to see if a spike in job runtime corresponds to a large data commit.

// src/routes/TableDashboard.svelte
<script>
    import { onMount } from 'svelte';
    import { Line } from 'svelte-chartjs';
    import { Chart, Title, Tooltip, Legend, LineElement, CategoryScale, LinearScale, PointElement, TimeScale } from 'chart.js';
    import 'chartjs-adapter-date-fns';

    Chart.register(Title, Tooltip, Legend, LineElement, CategoryScale, LinearScale, PointElement, TimeScale);

    let prometheusData = { labels: [], datasets: [] };
    let icebergData = { labels: [], datasets: [] };
    let isLoading = true;
    let errorMessage = '';

    const tableName = 'customer_events'; // This would be dynamic in a real app

    onMount(async () => {
        try {
            // In a real app, use a proper error handling and state management library.
            const [promResponse, icebergResponse] = await Promise.all([
                // Fetch average job duration for the last 6 hours
                fetch(`/api/prometheus/query_range?query=avg(spark_job_duration_seconds{table="${tableName}"})&start=${Date.now() / 1000 - 6 * 3600}&end=${Date.now() / 1000}&step=60`),
                // Fetch table metadata evolution for the last 6 hours from our custom API
                fetch(`/api/trino/query?sql=SELECT+timestamp_ms, total_data_files, total_records+FROM+datalake.meta_observability+WHERE+table_name='${tableName}' AND timestamp_ms > now() - interval '6' hour ORDER BY timestamp_ms`)
            ]);

            if (!promResponse.ok || !icebergResponse.ok) {
                throw new Error('Failed to fetch data from one or more sources.');
            }

            const promJson = await promResponse.json();
            const icebergJson = await icebergResponse.json();

            // Process Prometheus data
            const promResult = promJson.data.result[0]?.values || [];
            prometheusData = {
                datasets: [{
                    label: 'Avg Job Duration (s)',
                    data: promResult.map(([ts, val]) => ({ x: ts * 1000, y: parseFloat(val) })),
                    borderColor: 'rgb(255, 99, 132)',
                    backgroundColor: 'rgba(255, 99, 132, 0.5)',
                    yAxisID: 'y'
                }]
            };

            // Process Iceberg metadata
            icebergData = {
                datasets: [{
                    label: 'Total Data Files',
                    data: icebergJson.map(row => ({ x: new Date(row.timestamp_ms).getTime(), y: row.total_data_files })),
                    borderColor: 'rgb(54, 162, 235)',
                    backgroundColor: 'rgba(54, 162, 235, 0.5)',
                    yAxisID: 'y1',
                    stepped: true
                }]
            };

        } catch (error) {
            errorMessage = error.message;
            console.error("Dashboard Error:", error);
        } finally {
            isLoading = false;
        }
    });

    const chartOptions = {
        responsive: true,
        interaction: {
            mode: 'index',
            intersect: false,
        },
        scales: {
            x: {
                type: 'time',
                time: {
                    unit: 'hour'
                },
                title: {
                    display: true,
                    text: 'Time'
                }
            },
            y: {
                type: 'linear',
                display: true,
                position: 'left',
                title: {
                    display: true,
                    text: 'Job Duration (s)'
                }
            },
            y1: {
                type: 'linear',
                display: true,
                position: 'right',
                title: {
                    display: true,
                    text: 'File Count'
                },
                grid: {
                    drawOnChartArea: false, // only draw grid for first Y axis
                }
            }
        }
    };
</script>

<div class="dashboard-container">
    <h1>Observability for Table: {tableName}</h1>

    {#if isLoading}
        <p>Loading correlated data...</p>
    {:else if errorMessage}
        <p class="error">Error: {errorMessage}</p>
    {:else}
        <div class="chart-wrapper">
            <Line data={{ datasets: [...prometheusData.datasets, ...icebergData.datasets] }} options={chartOptions} />
        </div>
    {/if}
</div>

<style>
    .dashboard-container {
        font-family: sans-serif;
        padding: 2rem;
    }
    .chart-wrapper {
        position: relative;
        height: 60vh;
        width: 80vw;
    }
    .error {
        color: red;
    }
</style>

This implementation provides a powerful, correlated view. An SRE can now visually confirm that a 300% spike in job duration at 3:05 AM was directly preceded by a commit that added 50,000 new files to the table, instantly diagnosing a small-file problem without needing to manually inspect logs or storage.

Architectural Limitations and Future Iterations

This meta-observability architecture provides a significant leap in visibility, but it is not without its limitations. The metadata extraction process operates in micro-batches, meaning there is an inherent latency (typically minutes) between a change occurring in a production table and it becoming queryable in our meta-table. This is acceptable for trend analysis and post-mortem diagnostics but is insufficient for real-time alerting on structural data issues.

Furthermore, the cost of storing this metadata, while far less than the production data, is non-zero and will scale with the number of tables and the frequency of commits. A data retention policy and tiering strategy for the meta-analytics table itself will be necessary. Future work will focus on integrating a streaming component, potentially using Debezium on the Iceberg catalog or parsing commit notifications, to reduce metadata latency. Additionally, we plan to build ML models on top of this rich historical data to proactively predict tables at risk of performance degradation based on their evolving metadata patterns.


  TOC