Building a Reproducible ML Training Pipeline with Apache Iceberg as a Versioned Dataset on DigitalOcean


The project started with a familiar, sinking feeling. We were retraining a computer vision model in Keras, built six months prior, and the performance metrics were wildly different—for the worse. The original training data, a collection of thousands of images, was stored in a DigitalOcean Spaces bucket. Over the months, new images were added, incorrect labels were fixed, and some data was removed. The “ground truth” dataset that produced our production model was gone, lost to a series of unlogged s3cmd put and rm commands. Trying to reproduce the original training run was an exercise in guesswork. This is not just technical debt; in a production ML system, it’s a critical failure of process.

Our initial reaction was to enforce stricter naming conventions, like dataset_v1.0/, dataset_v1.1_fix/. This is a fragile solution that relies on human discipline and quickly falls apart. We considered Git LFS, but it’s not designed for the scale of data we were handling, nor does it provide the queryable, structured metadata we needed. The real problem was that we were treating our dataset like a mutable collection of files in a filesystem. We needed to treat it like a database, with ACID guarantees, versioning, and a queryable history, but without the overhead and cost of a full-blown data warehouse.

This led us to Apache Iceberg. It’s a table format, not a storage engine. It brings database-like semantics to raw object storage files. The key features that addressed our pain points were atomic commits and time travel. Every change to an Iceberg table creates a new, immutable snapshot of the table’s metadata. This means we can query the table exactly as it was at any point in time, identified by a snapshot ID or a timestamp. This was the foundation for true reproducibility. Our stack would be Keras for modeling, DigitalOcean Spaces for cost-effective storage, and Apache Iceberg as the versioning layer in between.

Environment and Catalog Configuration: The First Hurdle

Before writing a single line of ingestion code, the first challenge is setting up the Iceberg catalog. The catalog is the component that tracks the current metadata pointer for each table. In a large-scale enterprise environment, you would use a Hive Metastore, AWS Glue, or a REST catalog service. For our small team, running and maintaining these services on a DigitalOcean Droplet would introduce unnecessary operational complexity.

A common mistake is to over-engineer the initial setup. We needed something that worked, was reliable for a single-writer ingestion process, and lived entirely on our training Droplet. The pyiceberg library offers a SQL catalog backend, which can use a simple SQLite database file. This is a pragmatic trade-off. It’s not suitable for a distributed system with concurrent writes, but for our use case—a single, scheduled script that ingests data—it’s perfect. It has zero external dependencies beyond the Python library itself.

The configuration requires specifying the catalog implementation, the S3 endpoint (DigitalOcean Spaces), and credentials. Storing credentials directly in code is a security risk. A better practice is to use a simple configuration file or environment variables.

Here is the core setup module, config.py, which centralizes all our settings.

# config.py
import os
from pathlib import Path

# --- Project Structure ---
# Define the root directory of the project
PROJECT_ROOT = Path(__file__).parent.resolve()
# Local path for the SQLite database that will serve as our Iceberg Catalog
CATALOG_DB_PATH = PROJECT_ROOT / "iceberg_catalog.db"
# Local directory for staging data before ingestion
LOCAL_DATA_DIR = PROJECT_ROOT / "local_data_staging"
LOCAL_DATA_DIR.mkdir(exist_ok=True)

# --- DigitalOcean Spaces Configuration ---
# Your DO Spaces endpoint URL. This is region-specific.
# e.g., 'https://nyc3.digitaloceanspaces.com'
DO_ENDPOINT_URL = os.getenv("DO_ENDPOINT_URL")
# The name of your DO Spaces bucket
DO_BUCKET_NAME = os.getenv("DO_BUCKET_NAME")
# Your DO access key
DO_ACCESS_KEY = os.getenv("DO_ACCESS_KEY")
# Your DO secret key
DO_SECRET_KEY = os.getenv("DO_SECRET_KEY")

# --- Iceberg Configuration ---
# The name for our catalog within the application
CATALOG_NAME = "ml_catalog"
# The database/namespace within the catalog
ICEBERG_NAMESPACE = "computer_vision"
# The name of our primary image dataset table
ICEBERG_TABLE_NAME = "satellite_images"

# PyIceberg configuration dictionary. This is the central piece that tells
# pyiceberg how to connect to our storage and catalog.
ICEBERG_CATALOG_CONFIG = {
    CATALOG_NAME: {
        "type": "sql",
        "uri": f"sqlite:///{CATALOG_DB_PATH}",
        "s3.endpoint": DO_ENDPOINT_URL,
        "s3.access-key-id": DO_ACCESS_KEY,
        "s3.secret-access-key": DO_SECRET_KEY,
        "s3.region": DO_ENDPOINT_URL.split('.')[0] # Heuristic to get region, e.g., 'nyc3'
    }
}

# --- Validation ---
# A simple check to ensure critical environment variables are set.
# In a production system, this would be more robust, possibly using a library like Pydantic.
def validate_config():
    """Ensures that all necessary environment variables are set."""
    required_vars = [
        "DO_ENDPOINT_URL",
        "DO_BUCKET_NAME",
        "DO_ACCESS_KEY",
        "DO_SECRET_KEY"
    ]
    missing_vars = [var for var in required_vars if not globals()[var]]
    if missing_vars:
        raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")

# Run validation on import
validate_config()

This configuration is loaded by our application. The first script we need is one to initialize the namespace in our catalog. This only needs to be run once.

# initialize_catalog.py
import logging
from pyiceberg.catalog import load_catalog

from config import ICEBERG_CATALOG_CONFIG, CATALOG_NAME, ICEBERG_NAMESPACE

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def setup_iceberg_namespace():
    """
    Initializes the Iceberg catalog and creates the specified namespace if it doesn't exist.
    This is an idempotent operation.
    """
    logging.info("Attempting to initialize Iceberg catalog...")
    try:
        catalog = load_catalog(CATALOG_NAME, **ICEBERG_CATALOG_CONFIG)
        logging.info(f"Successfully loaded catalog '{CATALOG_NAME}'.")

        namespaces = catalog.list_namespaces()
        if (ICEBERG_NAMESPACE,) not in namespaces:
            logging.info(f"Namespace '{ICEBERG_NAMESPACE}' not found. Creating it...")
            catalog.create_namespace(ICEBERG_NAMESPACE)
            logging.info(f"Successfully created namespace '{ICEBERG_NAMESPACE}'.")
        else:
            logging.info(f"Namespace '{ICEBERG_NAMESPACE}' already exists.")

    except Exception as e:
        logging.error(f"Failed to initialize Iceberg catalog or namespace: {e}", exc_info=True)
        raise

if __name__ == "__main__":
    setup_iceberg_namespace()

Running python initialize_catalog.py will create the iceberg_catalog.db file and set up the computer_vision namespace within it. Our setup is now ready to receive data.

The Ingestion Pipeline: Creating Atomic Versions

The core of the solution is the ingestion process. Each time we add data, we must do it as an atomic append operation on the Iceberg table. This creates a new snapshot, effectively versioning our dataset.

The process involves two main steps:

  1. Upload the raw image files to a dedicated prefix in our DigitalOcean Spaces bucket.
  2. Create a corresponding metadata record (e.g., image path, label, timestamp) and append it to the Iceberg table.

The Iceberg table will not store the images themselves, only the pointers to them, along with other metadata. This is a crucial design choice for efficiency. Storing large binary blobs in a columnar format like Parquet (which Iceberg uses) is highly inefficient.

Here’s the ingestion script. It’s designed to be run with a directory of new images as input.

# ingest.py
import os
import uuid
import logging
from datetime import datetime

import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField,
    StringType,
    TimestampType,
    UUIDType,
)

from config import (
    ICEBERG_CATALOG_CONFIG,
    CATALOG_NAME,
    ICEBERG_NAMESPACE,
    ICEBERG_TABLE_NAME,
    DO_BUCKET_NAME,
    DO_ENDPOINT_URL,
    DO_ACCESS_KEY,
    DO_SECRET_KEY,
)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define the schema for our Iceberg table.
# A robust schema is critical for data quality and evolution.
IMAGE_TABLE_SCHEMA = Schema(
    NestedField(field_id=1, name="image_id", field_type=UUIDType(), required=True),
    NestedField(field_id=2, name="image_path", field_type=StringType(), required=True),
    NestedField(field_id=3, name="label", field_type=StringType(), required=True),
    NestedField(field_id=4, name="ingestion_ts", field_type=TimestampType(), required=True),
    NestedField(field_id=5, name="data_split", field_type=StringType(), required=False), # e.g., 'train', 'validation'
)

class DataIngestor:
    def __init__(self):
        """Initializes the S3 client and Iceberg catalog."""
        try:
            self.s3_client = boto3.client(
                's3',
                endpoint_url=DO_ENDPOINT_URL,
                aws_access_key_id=DO_ACCESS_KEY,
                aws_secret_access_key=DO_SECRET_KEY
            )
            self.catalog = load_catalog(CATALOG_NAME, **ICEBERG_CATALOG_CONFIG)
            self.table_identifier = (ICEBERG_NAMESPACE, ICEBERG_TABLE_NAME)
        except Exception as e:
            logging.error(f"Failed to initialize DataIngestor: {e}", exc_info=True)
            raise

    def upload_images_to_spaces(self, local_dir: str) -> list[dict]:
        """
        Uploads images from a local directory to DO Spaces and returns metadata.
        
        Args:
            local_dir: Path to the directory containing images.
        
        Returns:
            A list of dictionaries, each containing metadata for an uploaded image.
        """
        metadata_records = []
        image_prefix = f"raw_images/{datetime.utcnow().strftime('%Y-%m-%d')}"

        for label in os.listdir(local_dir):
            label_dir = os.path.join(local_dir, label)
            if not os.path.isdir(label_dir):
                continue
            
            for filename in os.listdir(label_dir):
                if not (filename.lower().endswith(".jpg") or filename.lower().endswith(".png")):
                    continue

                local_path = os.path.join(label_dir, filename)
                image_uuid = uuid.uuid4()
                s3_key = f"{image_prefix}/{label}/{image_uuid}-{filename}"
                
                try:
                    logging.info(f"Uploading {local_path} to s3://{DO_BUCKET_NAME}/{s3_key}")
                    self.s3_client.upload_file(local_path, DO_BUCKET_NAME, s3_key)
                    
                    metadata_records.append({
                        "image_id": image_uuid,
                        "image_path": f"s3://{DO_BUCKET_NAME}/{s3_key}",
                        "label": label,
                        "ingestion_ts": datetime.utcnow(),
                        "data_split": "train" # Example assignment
                    })
                except Exception as e:
                    logging.error(f"Failed to upload {local_path}: {e}", exc_info=True)
                    # In a production system, you might add this to a retry queue.
        
        return metadata_records

    def ingest_data(self, local_dir: str):
        """
        Main ingestion workflow: upload images and append metadata to Iceberg.
        """
        logging.info(f"Starting ingestion process for directory: {local_dir}")
        metadata_records = self.upload_images_to_spaces(local_dir)
        
        if not metadata_records:
            logging.warning("No new images found or uploaded. Ingestion finished.")
            return

        df = pd.DataFrame(metadata_records)
        # Convert timestamp to a timezone-aware format for PyArrow compatibility
        df['ingestion_ts'] = df['ingestion_ts'].dt.tz_localize('UTC')

        try:
            # Check if table exists, create if not
            if self.table_identifier not in self.catalog.list_tables(ICEBERG_NAMESPACE):
                logging.info(f"Table '{'.'.join(self.table_identifier)}' does not exist. Creating...")
                self.catalog.create_table(
                    identifier=self.table_identifier, 
                    schema=IMAGE_TABLE_SCHEMA
                )
                logging.info("Table created successfully.")
            
            table = self.catalog.load_table(self.table_identifier)
            
            # Append data. This is the atomic operation that creates a new snapshot.
            table.append(df)
            logging.info(f"Successfully appended {len(df)} records to the Iceberg table.")
            
            # Log the new snapshot details
            current_snapshot = table.current_snapshot()
            logging.info(f"New table state: Snapshot ID={current_snapshot.snapshot_id}, Records={current_snapshot.summary['total-records']}")

        except Exception as e:
            logging.error(f"Failed to append data to Iceberg table: {e}", exc_info=True)
            # Here, we would need a cleanup strategy for the uploaded S3 files
            # to avoid orphaned data.
            raise

if __name__ == "__main__":
    # To run this, create a directory structure like:
    # local_data_staging/
    #   batch1/
    #     class_a/
    #       image1.jpg
    #       image2.jpg
    #     class_b/
    #       image3.jpg
    #   batch2/
    #     class_a/
    #       image4.jpg
    #     class_c/
    #       image5.jpg
    
    ingestor = DataIngestor()
    
    # Simulate first ingestion
    first_batch_dir = "local_data_staging/batch1"
    if os.path.exists(first_batch_dir):
         ingestor.ingest_data(first_batch_dir)
    else:
        logging.warning(f"Directory not found: {first_batch_dir}. Skipping first ingestion.")

    # Simulate a second, later ingestion
    second_batch_dir = "local_data_staging/batch2"
    if os.path.exists(second_batch_dir):
        ingestor.ingest_data(second_batch_dir)
    else:
        logging.warning(f"Directory not found: {second_batch_dir}. Skipping second ingestion.")

After running this script with two separate batches of data, our Iceberg table now has a history. We can inspect this using pyiceberg to see the different snapshots, each representing a consistent state of our dataset. This is the foundation of our reproducibility.

The Custom Keras Data Loader: Bridging Iceberg and TensorFlow

The final and most critical piece is connecting this versioned dataset to our Keras training loop. Keras’s built-in utilities like image_dataset_from_directory are useless here; they operate on a filesystem view of the world. We need a custom data generator that understands how to:

  1. Load a specific Iceberg table snapshot.
  2. Read the metadata (image paths and labels) from that snapshot.
  3. Fetch the actual image data from DigitalOcean Spaces on-the-fly during training.
  4. Preprocess the images and yield them in batches to the model.

The keras.utils.Sequence class is the perfect tool for this. It provides a thread-safe, standardized way to create custom data generators for model.fit().

# data_loader.py
import logging
import io

import boto3
import numpy as np
import pandas as pd
from tensorflow import keras
from PIL import Image
from pyiceberg.catalog import load_catalog

from config import (
    ICEBERG_CATALOG_CONFIG,
    CATALOG_NAME,
    ICEBERG_NAMESPACE,
    ICEBERG_TABLE_NAME,
    DO_ENDPOINT_URL,
    DO_ACCESS_KEY,
    DO_SECRET_KEY,
    DO_BUCKET_NAME
)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class IcebergDatasetSequence(keras.utils.Sequence):
    """
    A Keras Sequence for loading image data from a versioned Apache Iceberg table.
    """
    def __init__(
        self,
        batch_size: int,
        target_size: tuple[int, int],
        snapshot_id: int | None = None,
        as_of_timestamp: int | None = None,
        data_split: str = 'train'
    ):
        """
        Initializes the data loader for a specific snapshot of the Iceberg table.

        Args:
            batch_size: The size of batches to generate.
            target_size: The size to which images will be resized (height, width).
            snapshot_id: The specific Iceberg snapshot ID to use for training.
            as_of_timestamp: A timestamp to perform time-travel query.
            data_split: The data split to filter on (e.g., 'train', 'validation').
        """
        self.batch_size = batch_size
        self.target_size = target_size
        self.data_split = data_split
        self.s3_client = boto3.client(
            's3',
            endpoint_url=DO_ENDPOINT_URL,
            aws_access_key_id=DO_ACCESS_KEY,
            aws_secret_access_key=DO_SECRET_KEY
        )

        try:
            catalog = load_catalog(CATALOG_NAME, **ICEBERG_CATALOG_CONFIG)
            table_identifier = (ICEBERG_NAMESPACE, ICEBERG_TABLE_NAME)
            table = catalog.load_table(table_identifier)
            
            # --- This is the core of reproducibility ---
            if snapshot_id:
                logging.info(f"Loading data from specific snapshot_id: {snapshot_id}")
                scan = table.scan(snapshot_id=snapshot_id)
            elif as_of_timestamp:
                logging.info(f"Loading data as of timestamp: {as_of_timestamp}")
                scan = table.scan(as_of_timestamp=as_of_timestamp)
            else:
                logging.info("Loading data from the latest snapshot.")
                scan = table.scan()
            
            # Read all metadata for the selected snapshot into a pandas DataFrame.
            # In a very large dataset, we might stream this.
            self.metadata_df = scan.to_pandas()
            
            # Filter for the desired data split
            self.metadata_df = self.metadata_df[self.metadata_df['data_split'] == self.data_split]
            self.metadata_df.reset_index(drop=True, inplace=True)
            
            # Prepare labels
            self.labels = self.metadata_df['label'].unique()
            self.label_map = {label: i for i, label in enumerate(self.labels)}
            self.num_classes = len(self.labels)
            
            logging.info(f"Found {len(self.metadata_df)} samples for split '{self.data_split}' using snapshot.")
            
        except Exception as e:
            logging.error(f"Failed to initialize IcebergDatasetSequence: {e}", exc_info=True)
            raise

    def __len__(self):
        """Returns the number of batches per epoch."""
        return int(np.floor(len(self.metadata_df) / self.batch_size))

    def __getitem__(self, index):
        """Generates one batch of data."""
        start_index = index * self.batch_size
        end_index = (index + 1) * self.batch_size
        batch_metadata = self.metadata_df.iloc[start_index:end_index]
        
        # Initialize arrays for the batch
        X = np.empty((self.batch_size, *self.target_size, 3))
        y = np.empty((self.batch_size), dtype=int)
        
        for i, row in enumerate(batch_metadata.itertuples()):
            try:
                # The path is in the format s3://bucket/key
                s3_key = row.image_path.split(f"s3://{DO_BUCKET_NAME}/")[1]
                
                # Download image from Spaces into memory
                response = self.s3_client.get_object(Bucket=DO_BUCKET_NAME, Key=s3_key)
                image_bytes = response['Body'].read()
                
                # Process the image
                img = Image.open(io.BytesIO(image_bytes)).convert('RGB')
                img = img.resize(self.target_size)
                
                X[i,] = np.array(img) / 255.0  # Normalize to [0, 1]
                y[i] = self.label_map[row.label]
                
            except Exception as e:
                logging.warning(f"Could not load image {row.image_path}: {e}. Skipping.")
                # In production, we might replace this with a placeholder image.
                X[i,] = np.zeros((*self.target_size, 3))
                y[i] = 0 # Or some default label
        
        return X, keras.utils.to_categorical(y, num_classes=self.num_classes)
        
    def on_epoch_end(self):
        """Shuffle metadata at the end of each epoch to ensure training robustness."""
        self.metadata_df = self.metadata_df.sample(frac=1).reset_index(drop=True)

Proving Reproducibility: The Training Loop

Now we can tie everything together in a training script. We will first find the snapshot IDs from our table’s history. Then, we will train a simple model twice: once on the first snapshot, and once on the latest snapshot. Finally, we will prove that we can perfectly reproduce the first training run by re-instantiating the data loader with the original snapshot ID.

# train.py
import logging
from tensorflow import keras
from tensorflow.keras import layers
from pyiceberg.catalog import load_catalog

from data_loader import IcebergDatasetSequence
from config import ICEBERG_CATALOG_CONFIG, CATALOG_NAME, ICEBERG_NAMESPACE, ICEBERG_TABLE_NAME

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_snapshot_ids() -> tuple[int, int]:
    """Retrieves the first and last snapshot IDs from the Iceberg table."""
    catalog = load_catalog(CATALOG_NAME, **ICEBERG_CATALOG_CONFIG)
    table_identifier = (ICEBERG_NAMESPACE, ICEBERG_TABLE_NAME)
    table = catalog.load_table(table_identifier)
    
    history = table.history()
    if len(history) < 2:
        raise RuntimeError("The table must have at least two snapshots to run this demo.")
        
    first_snapshot_id = history[0].snapshot_id
    latest_snapshot_id = history[-1].snapshot_id
    
    logging.info(f"Found first snapshot ID: {first_snapshot_id}")
    logging.info(f"Found latest snapshot ID: {latest_snapshot_id}")
    
    return first_snapshot_id, latest_snapshot_id

def build_model(num_classes: int) -> keras.Model:
    """Builds a simple CNN model for demonstration."""
    model = keras.Sequential([
        layers.Input(shape=(128, 128, 3)),
        layers.Conv2D(32, (3, 3), activation='relu'),
        layers.MaxPooling2D(),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D(),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def main():
    BATCH_SIZE = 16
    TARGET_SIZE = (128, 128)

    try:
        first_snapshot_id, latest_snapshot_id = get_snapshot_ids()

        # --- Training Run 1: Using the first dataset snapshot ---
        logging.info("\n" + "="*50)
        logging.info(f"STARTING TRAINING on FIRST snapshot: {first_snapshot_id}")
        logging.info("="*50)
        
        train_seq_v1 = IcebergDatasetSequence(
            batch_size=BATCH_SIZE,
            target_size=TARGET_SIZE,
            snapshot_id=first_snapshot_id
        )
        model_v1 = build_model(train_seq_v1.num_classes)
        history_v1 = model_v1.fit(train_seq_v1, epochs=2)
        logging.info(f"Results for V1 (Snapshot {first_snapshot_id}): {history_v1.history}")

        # --- Training Run 2: Using the latest dataset snapshot ---
        logging.info("\n" + "="*50)
        logging.info(f"STARTING TRAINING on LATEST snapshot: {latest_snapshot_id}")
        logging.info("="*50)
        
        train_seq_v2 = IcebergDatasetSequence(
            batch_size=BATCH_SIZE,
            target_size=TARGET_SIZE,
            snapshot_id=latest_snapshot_id
        )
        model_v2 = build_model(train_seq_v2.num_classes)
        history_v2 = model_v2.fit(train_seq_v2, epochs=2)
        logging.info(f"Results for V2 (Snapshot {latest_snapshot_id}): {history_v2.history}")

        # --- Training Run 3: Reproducing the first run ---
        logging.info("\n" + "="*50)
        logging.info(f"REPRODUCING TRAINING on FIRST snapshot: {first_snapshot_id}")
        logging.info("="*50)
        
        # Re-instantiate the sequence with the same snapshot_id
        train_seq_v1_repro = IcebergDatasetSequence(
            batch_size=BATCH_SIZE,
            target_size=TARGET_SIZE,
            snapshot_id=first_snapshot_id
        )
        model_v1_repro = build_model(train_seq_v1_repro.num_classes)
        # For perfect reproducibility, random seeds in TensorFlow/Keras should also be fixed.
        # This demonstration focuses on the data aspect.
        history_v1_repro = model_v1_repro.fit(train_seq_v1_repro, epochs=2)
        logging.info(f"Results for REPRODUCED V1 (Snapshot {first_snapshot_id}): {history_v1_repro.history}")
        
    except Exception as e:
        logging.error(f"An error occurred during the training process: {e}", exc_info=True)

if __name__ == "__main__":
    main()

The output of this script demonstrates the system’s power. The training results on the first snapshot and the latest snapshot will likely differ due to the change in data. However, the results from the first run and the reproduction run will be based on the exact same dataset, validating our approach.

graph TD
    subgraph Local Environment
        A[Image Files batch1] --> B{ingest.py};
        C[Image Files batch2] --> B;
    end
    subgraph DigitalOcean
        B --> |1. Uploads Images| D[DO Spaces Bucket: raw_images/];
        B --> |2. Appends Metadata| E[Iceberg Table Metadata];
        E --> |Stored in| F[DO Droplet: iceberg_catalog.db];
        E --> |Points to Parquet files in| G[DO Spaces Bucket: table_metadata/];
    end
    subgraph Training Process
        H[train.py] --> I{IcebergDatasetSequence};
        I -->|Reads snapshot info| F;
        I -->|Reads metadata from| G;
        I -->|Fetches images from| D;
        I -->|Yields batches to| J[Keras Model];
    end
    style F fill:#f9f,stroke:#333,stroke-width:2px
    style J fill:#bbf,stroke:#333,stroke-width:2px

This architecture, while simple, is robust. The SQLite catalog is a known limitation and a single point of failure; in a multi-user or distributed setup, this must be upgraded to a production-grade catalog like one backed by PostgreSQL or a dedicated service like Project Nessie. The data loading in the IcebergDatasetSequence is also serial; performance could be dramatically improved by integrating it with tf.data and using map with num_parallel_calls to fetch and preprocess images in parallel. Furthermore, this system only versions raw data pointers. A more mature MLOps platform would extend this pipeline to include a feature extraction step, storing the computed embeddings directly in the Iceberg table, creating a true, versioned feature store that decouples model training from the computationally expensive feature generation process.


  TOC