The core problem is not generating features; it’s serving them reliably under two conflicting constraints. For a large-scale NLP service using a Transformer-based model, the inference endpoint requires feature access with p99 latencies under 10 milliseconds. Simultaneously, the ML training pipeline needs a versioned, auditable, and queryable historical record of these same features to prevent training-serving skew. This immediately implies a hybrid storage architecture: a low-latency online store for serving and a robust, scalable offline store for training and analytics. The true architectural challenge lies in the data synchronization mechanism between these two stores, a decision fundamentally governed by the trade-offs described in the CAP theorem.
Defining the Architectural Crossroads
We have a stream of incoming documents that need to be converted into dense vector embeddings by a Hugging Face Transformer model. These embeddings are the features.
- Online Requirement: An API needs to fetch an embedding for a given document ID with single-digit millisecond latency.
- Offline Requirement: A Spark job needs to access all embeddings generated over the last 6 months to retrain the downstream models. This process must be repeatable and tied to specific versions of the data.
Our technology choices are constrained: Apache Iceberg
is mandated for the offline store due to its transactional guarantees and time-travel capabilities. Memcached
is the designated online key-value store for its raw speed and simplicity. The question is how to populate both systems from the feature generation service.
Two primary architectural patterns emerge, each representing a different stance on the CAP theorem’s consistency-availability spectrum.
Solution A: The Tightly Coupled, CP-Leaning Architecture
The most direct approach is to treat the write to both stores as a single, atomic-like operation. The feature generation service would compute the embedding and then attempt to write it to Iceberg and Memcached in a synchronous, dual-write fashion.
graph TD A[Feature Generation Service] -->|1. Compute Embedding| B(Embedding Vector) subgraph "Synchronous Write Block" B --> |2a. Write to Iceberg| C[Apache Iceberg Table] B --> |2b. Write to Memcached| D[Memcached Cluster] end C --> E{Write Success?} D --> E E --> |On Failure: Rollback/Retry| A E --> |On Success| F[Acknowledge Upstream]
This design prioritizes Consistency (C) over Availability (A). An embedding is either present in both systems or in neither. This significantly reduces the window for data divergence, a critical factor in mitigating training-serving skew.
The primary drawback is its brittleness. The availability of the entire feature ingestion pipeline becomes the minimum of the availability of its components. If the Memcached cluster experiences a transient network partition or a node failure, the write fails. If the Iceberg commit to the metastore is slow due to contention, the entire process stalls. In a real-world project, this coupling creates a fragile system where a failure in the less critical caching layer can halt the ingestion of data into the permanent, source-of-truth offline store.
A Python implementation sketch of this pattern reveals the operational complexity.
# WARNING: This is a conceptual example of a fragile, tightly coupled design.
# It is NOT the recommended approach.
import logging
from pymemcache.client.base import Client as MemcachedClient
from pyspark.sql import SparkSession, Row
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
# --- Configuration ---
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
MEMCACHED_SERVERS = (('127.0.0.1', 11211),)
ICEBERG_CATALOG = "my_catalog"
ICEBERG_TABLE = "nlp.doc_embeddings"
# --- Service Initialization ---
class FeatureGenerationService:
def __init__(self):
logging.info("Initializing FeatureGenerationService...")
self.memcached_client = MemcachedClient(MEMCACHED_SERVERS, connect_timeout=2, timeout=2)
# In a real app, SparkSession would be managed more carefully.
self.spark = SparkSession.builder \
.config(f"spark.sql.catalog.{ICEBERG_CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{ICEBERG_CATALOG}.type", "hive") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
logging.info("Loading Sentence Transformer model...")
self.tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
self.model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
logging.info("Service initialized.")
def _mean_pooling(self, model_output, attention_mask):
token_embeddings = model_output[0]
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def generate_embedding(self, text: str) -> list[float]:
encoded_input = self.tokenizer(text, padding=True, truncation=True, return_tensors='pt')
with torch.no_grad():
model_output = self.model(**encoded_input)
sentence_embedding = self._mean_pooling(model_output, encoded_input['attention_mask'])
return sentence_embedding.squeeze().tolist()
def process_and_store_document(self, doc_id: str, doc_text: str):
"""
Processes a document and performs a dual write.
This is the core of the CP-leaning, fragile approach.
"""
logging.info(f"Processing document_id: {doc_id}")
try:
embedding = self.generate_embedding(doc_text)
# The critical, tightly coupled block
try:
# 1. Write to Memcached first
# The key must be bytes.
# The value is serialized, here as a simple comma-separated string.
# In production, use something more robust like msgpack or protobuf.
serialized_embedding = ",".join(map(str, embedding))
self.memcached_client.set(doc_id.encode('utf-8'), serialized_embedding.encode('utf-8'), expire=3600)
logging.info(f"Successfully cached embedding for {doc_id} in Memcached.")
# 2. Write to Iceberg
embedding_row = Row(doc_id=doc_id, embedding=embedding, processed_at=datetime.utcnow())
df = self.spark.createDataFrame([embedding_row])
df.writeTo(ICEBERG_TABLE).append()
logging.info(f"Successfully wrote embedding for {doc_id} to Iceberg.")
except Exception as e:
logging.error(f"Dual write failed for {doc_id}: {e}. Attempting rollback.")
# Rollback attempt: try to delete from Memcached if it was written.
# This is error-prone. What if this delete call fails?
self.memcached_client.delete(doc_id.encode('utf-8'), noreply=False)
logging.warning(f"Attempted to roll back cache write for {doc_id}.")
# Re-raise the exception to signal failure
raise
except Exception as e:
logging.error(f"Failed to process document {doc_id}: {e}")
# Here you would typically push to a dead-letter queue.
return False
return True
The pitfall here is the rollback logic. It’s non-transactional. A failure after the Memcached write but before the Iceberg write leaves orphaned data in the cache. A failure during the rollback leaves the system in an inconsistent state. This path leads to complex, often-unreliable compensation logic that attempts to mimic a two-phase commit, without the guarantees.
Solution B: The Decoupled, AP-Leaning Architecture
A more resilient architecture embraces eventual consistency and prioritizes availability (AP). In this model, the feature generation service has one responsibility: writing the feature to the durable, source-of-truth offline store (Apache Iceberg). This write path is simple and highly available.
A separate, asynchronous process is then responsible for replicating data from Iceberg to Memcached. This “Cache Warmer” or “Replicator” service periodically checks Iceberg for new data and updates the online store.
graph TD subgraph "Write Path (Highly Available)" A[Feature Generation Service] -->|1. Compute Embedding| B(Embedding Vector) B --> |2. Append to Iceberg| C[Apache Iceberg Table] C --> |3. Acknowledge| A end subgraph "Async Replication Path" D[Cache Warmer Service] -->|1. Poll for new snapshots| C C -->|2. Returns new data files| D D -->|3. Read new data| E[Data Files on S3/HDFS] D -->|4. Push to Memcached| F[Memcached Cluster] end
This design decouples the online and offline systems. A failure in the Memcached cluster or the replicator service does not stop new features from being written to Iceberg. The system gracefully degrades: new features might not be immediately available for online serving, but the core data is captured safely. The trade-off is a measurable replication lag. This is a classic AP system: it remains available for writes even if the replication mechanism is partitioned or down.
The critical implementation detail is how the Cache Warmer tracks its progress to avoid reprocessing data. Apache Iceberg’s snapshot model is perfect for this. Every write to an Iceberg table creates a new snapshot with a unique ID. The replicator simply needs to store the ID of the last snapshot it processed and, on its next run, ask Iceberg for all changes since that snapshot.
Final Choice and Rationale
For a system where a few seconds of feature staleness is preferable to a complete ingestion outage, Solution B is the pragmatic choice. In most real-world ML serving scenarios, the business impact of a 5-second delay in a feature being available online is negligible compared to the impact of the entire feature pipeline halting for 5 minutes because the caching tier is unavailable. The key is to make the replication lag a primary, observable metric. If that lag exceeds a defined SLO (e.g., 15 seconds), an alert should be triggered. This transforms an architectural weakness (eventual consistency) into a manageable operational parameter.
Core Implementation of the AP-Leaning Architecture
Let’s build the core components for Solution B. This involves three parts: the Iceberg table definition, the feature generation service that writes to Iceberg, and the asynchronous replicator service.
1. Iceberg Table Definition
We will use PyIceberg and the REST catalog for this implementation, as it decouples us from Spark for simple operations. The table will store the document ID, the embedding, and a timestamp.
-- DDL for creating the Iceberg table
CREATE TABLE nlp.doc_embeddings (
doc_id STRING,
embedding ARRAY<FLOAT>,
processed_at TIMESTAMP
)
PARTITIONED BY (days(processed_at))
TBLPROPERTIES (
'write.format.default'='parquet'
);
2. Production-Grade Feature Generation and Iceberg Writer
This service focuses solely on generating embeddings and writing them robustly to Iceberg. It will include batching for efficiency.
# feature_writer.py
import logging
import os
from datetime import datetime
from typing import List, Dict, Tuple
import pyarrow as pa
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from transformers import pipeline, AutoTokenizer, AutoModel
import torch
# --- Configuration ---
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger(__name__)
# Assumes catalog config is in environment variables or iceberg-defaults.yaml
CATALOG_NAME = os.environ.get("ICEBERG_CATALOG", "default")
ICEBERG_TABLE_NAME = "nlp.doc_embeddings"
class EmbeddingGenerator:
"""Encapsulates the Transformer model for generating embeddings."""
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
logger.info(f"Loading model: {model_name}")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name).to(self.device)
logger.info(f"Model loaded on device: {self.device}")
def _mean_pooling(self, model_output, attention_mask):
token_embeddings = model_output[0]
input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)
def generate_batch(self, texts: List[str]) -> List[List[float]]:
encoded_input = self.tokenizer(texts, padding=True, truncation=True, return_tensors='pt').to(self.device)
with torch.no_grad():
model_output = self.model(**encoded_input)
sentence_embeddings = self._mean_pooling(model_output, encoded_input['attention_mask'])
return sentence_embeddings.cpu().numpy().tolist()
class IcebergFeatureWriter:
"""Handles writing feature batches to Apache Iceberg."""
def __init__(self, catalog_name: str, table_name: str):
self.table_name = table_name
logger.info(f"Initializing Iceberg catalog '{catalog_name}'...")
self.catalog = load_catalog(catalog_name)
try:
self.table = self.catalog.load_table(table_name)
logger.info(f"Successfully loaded Iceberg table: {table_name}")
logger.info(f"Current table schema: {self.table.schema()}")
except Exception as e:
logger.error(f"Failed to load Iceberg table {table_name}. It may not exist. Error: {e}")
raise
def write(self, features: List[Dict]):
"""
Writes a list of feature dictionaries to the Iceberg table.
Each dict should be like: {'doc_id': str, 'embedding': list[float]}
"""
if not features:
logger.warning("No features provided to write.")
return
logger.info(f"Preparing to write {len(features)} records to Iceberg.")
# Add timestamp to each record
now = datetime.utcnow()
for f in features:
f['processed_at'] = now
# Convert to PyArrow Table for efficient writing
try:
arrow_table = pa.Table.from_pylist(features, schema=self.table.schema().as_arrow)
logger.info("Successfully converted features to Arrow Table.")
except Exception as e:
logger.error(f"Schema mismatch or data conversion error: {e}")
logger.error(f"Expected schema: {self.table.schema().as_arrow}")
logger.error(f"Sample data record: {features[0] if features else 'N/A'}")
raise
try:
# Append data to the Iceberg table
self.table.append(arrow_table)
logger.info(f"Successfully appended {len(features)} records. New snapshot ID: {self.table.current_snapshot().snapshot_id}")
except Exception as e:
logger.error(f"Failed to append data to Iceberg table {self.table_name}: {e}")
raise
# --- Main execution logic ---
def main():
"""Example usage of the writer service."""
documents_to_process = [
{"doc_id": "doc-001", "text": "The CAP theorem is a fundamental concept in distributed systems."},
{"doc_id": "doc-002", "text": "Apache Iceberg provides transactional capabilities for data lakes."},
{"doc_id": "doc-003", "text": "Hugging Face Transformers simplify access to state-of-the-art NLP models."},
{"doc_id": "doc-004", "text": "Memcached is a high-performance, distributed memory object caching system."},
]
id_batch = [doc['doc_id'] for doc in documents_to_process]
text_batch = [doc['text'] for doc in documents_to_process]
generator = EmbeddingGenerator()
writer = IcebergFeatureWriter(CATALOG_NAME, ICEBERG_TABLE_NAME)
logger.info(f"Generating embeddings for batch of {len(text_batch)} documents.")
embeddings = generator.generate_batch(text_batch)
feature_records = [
{"doc_id": doc_id, "embedding": emb}
for doc_id, emb in zip(id_batch, embeddings)
]
writer.write(feature_records)
logger.info("Feature writing process complete.")
if __name__ == "__main__":
# In a real system, this would be a long-running service consuming from a queue like Kafka/Pulsar.
main()
3. Asynchronous Iceberg-to-Memcached Replicator
This is the most critical piece of the AP architecture. It needs to be robust, stateful, and performant. It will read the last processed snapshot ID from a local file for simplicity, though in production a more robust state store like etcd or a database would be used.
# cache_replicator.py
import logging
import os
import time
from typing import Optional
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table
from pymemcache.client.base import Client as MemcachedClient
from pymemcache.exceptions import MemcacheError
import msgpack # Using MessagePack for more efficient serialization than text
# --- Configuration ---
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger(__name__)
CATALOG_NAME = os.environ.get("ICEBERG_CATALOG", "default")
ICEBERG_TABLE_NAME = "nlp.doc_embeddings"
MEMCACHED_SERVERS = (('127.0.0.1', 11211),)
STATE_FILE_PATH = "./replicator_state.txt"
POLL_INTERVAL_SECONDS = 10
MEMCACHED_TTL_SECONDS = 3600 # 1 hour
BATCH_SIZE = 1000 # Number of keys to set in Memcached at once
class ReplicatorState:
"""Manages persisting and retrieving the last processed snapshot ID."""
def __init__(self, file_path: str):
self.file_path = file_path
def get_last_processed_snapshot_id(self) -> Optional[int]:
if not os.path.exists(self.file_path):
return None
try:
with open(self.file_path, 'r') as f:
content = f.read().strip()
if content:
return int(content)
except (IOError, ValueError) as e:
logger.error(f"Error reading state file {self.file_path}: {e}")
return None
def set_last_processed_snapshot_id(self, snapshot_id: int):
try:
with open(self.file_path, 'w') as f:
f.write(str(snapshot_id))
except IOError as e:
logger.error(f"Error writing to state file {self.file_path}: {e}")
raise
class IcebergToMemcachedReplicator:
def __init__(self):
logger.info("Initializing replicator...")
self.state = ReplicatorState(STATE_FILE_PATH)
self.memcached_client = MemcachedClient(MEMCACHED_SERVERS, connect_timeout=5, timeout=5)
self.catalog = load_catalog(CATALOG_NAME)
self.table = self.catalog.load_table(ICEBERG_TABLE_NAME)
logger.info("Replicator initialized.")
def run_once(self):
"""Performs a single replication cycle."""
last_processed_id = self.state.get_last_processed_snapshot_id()
self.table.refresh() # Refresh table metadata
current_snapshot_id = self.table.current_snapshot().snapshot_id if self.table.current_snapshot() else None
if current_snapshot_id is None:
logger.info("Table is empty. Nothing to process.")
return
if last_processed_id == current_snapshot_id:
logger.info(f"No new snapshots found. Current snapshot is {current_snapshot_id}.")
return
logger.info(f"New snapshots detected. Replicating from snapshot > {last_processed_id} to {current_snapshot_id}.")
try:
# Get the log of snapshots since the last one we processed.
# This provides the net new data files.
snapshot_log = self.table.history()
# Find the starting point in the log
start_index = -1
if last_processed_id:
for i, snapshot in enumerate(snapshot_log):
if snapshot.snapshot_id == last_processed_id:
start_index = i
break
new_snapshots = snapshot_log[start_index + 1:]
if not new_snapshots:
logger.warning(f"Could not find snapshot {last_processed_id} in history, might be expired. Processing from current.")
# Fallback: process only the latest snapshot if history is lost
new_snapshots = [self.table.current_snapshot()]
total_keys_cached = 0
for snapshot in new_snapshots:
logger.info(f"Processing snapshot {snapshot.snapshot_id} (committed at {snapshot.commit_timestamp_ms})")
# We use snapshot.added_files which is more robust than diffing manifests
for file_scan_task in snapshot.added_files(self.table.schema):
file_path = file_scan_task.file.file_path
logger.info(f"Reading data file: {file_path}")
try:
arrow_table = pq.read_table(file_path)
df = arrow_table.to_pandas()
# A common mistake is to send one key at a time. Batching is crucial.
batch = {}
for _, row in df.iterrows():
key = row['doc_id'].encode('utf-8')
# Serialize using MessagePack for efficiency
value = msgpack.packb(row['embedding'])
batch[key] = value
if len(batch) >= BATCH_SIZE:
self._flush_batch_to_memcached(batch)
total_keys_cached += len(batch)
batch = {}
if batch: # Flush any remaining items
self._flush_batch_to_memcached(batch)
total_keys_cached += len(batch)
except Exception as e:
logger.error(f"Failed to process file {file_path}: {e}")
# In production, you'd have a strategy for failed files (e.g., DLQ)
continue
# If all processing is successful, update the state
self.state.set_last_processed_snapshot_id(current_snapshot_id)
logger.info(f"Replication cycle complete. Cached {total_keys_cached} new keys. Advanced state to snapshot {current_snapshot_id}.")
except Exception as e:
logger.critical(f"An unhandled error occurred during replication cycle: {e}", exc_info=True)
def _flush_batch_to_memcached(self, batch: dict):
try:
failed_keys = self.memcached_client.set_many(batch, expire=MEMCACHED_TTL_SECONDS)
if failed_keys:
logger.error(f"Failed to set {len(failed_keys)} keys in Memcached: {failed_keys}")
except MemcacheError as e:
logger.error(f"Memcached connection error during batch set: {e}")
# This is a critical failure point. A robust system would have retries with backoff.
def start_loop(self):
"""Runs the replication process in a continuous loop."""
logger.info(f"Starting replication loop with a poll interval of {POLL_INTERVAL_SECONDS} seconds.")
while True:
try:
self.run_once()
except Exception as e:
logger.critical(f"Replication loop caught unhandled exception: {e}", exc_info=True)
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
replicator = IcebergToMemcachedReplicator()
replicator.start_loop()
Limitations and Future Iterations
This AP-leaning architecture provides high availability for writes but introduces an observable replication lag, which makes it unsuitable for systems requiring real-time consistency between the online and offline stores, such as certain financial transaction systems. The polling mechanism of the replicator is simple but inherently adds latency equal to the poll interval; a more advanced implementation might use a notification service (e.g., S3 event notifications on manifest writes) to trigger replication instantly, reducing the lag to the processing time itself.
Furthermore, the state management via a local file is a single point of failure and not suitable for a horizontally scaled replicator service. A production system would require a distributed consensus store like etcd or a database with transactional guarantees to manage the last processed snapshot ID. Finally, the current design does not handle feature updates or deletions gracefully in the cache; a full solution would need to process Iceberg’s delete_files
to issue delete
commands to Memcached, ensuring the cache does not serve stale data for removed entities.