The prototype worked flawlessly. A Keras model, a convolutional recurrent neural network, was achieving respectable accuracy in identifying emotions from 10-second audio clips. Trained on a curated 50GB dataset, the entire process completed overnight on a single V100 GPU. The problem started when the project was greenlit for production. The new requirement was to train this model on our historical archive of customer service calls—over 100 terabytes of raw audio files sitting dormant in our primary HDFS cluster.
A single-node training script, even with a powerful GPU, was no longer feasible. The initial attempt to mount HDFS and stream data to the training box failed spectacularly. The network I/O became the bottleneck, and the GPU utilization flatlined below 10%. The training time estimate blew past three months. It was clear the paradigm had to shift from moving massive data to the compute, to moving the compute to the data. Our existing infrastructure was a large Hadoop cluster managed by YARN, primarily used for Spark ETL jobs. The challenge was to leverage this investment for a distributed, data-parallel Keras training workload without procuring a dedicated, high-performance computing cluster.
Our data science team had developed a complex Keras model with custom layers. Rewriting it in Spark ML was not an option due to both the complexity and the team’s skillset. The directive was clear: make Keras work, at scale, on YARN. The initial thought was to use a high-level framework like TensorFlowOnSpark, but it felt like a black box and had spotty support for the latest TensorFlow versions. In a production environment, control and debuggability are paramount. This led to the decision to build a more transparent orchestration layer using skein
, a tool for managing long-running applications on YARN. This approach gives us direct control over the Python environment, container specifications, and the communication protocol for the training workers.
The architecture settled on a multi-stage pipeline:
- A one-time, massively parallel data preprocessing job using PySpark to convert 100TB of raw audio into a more manageable set of TFRecords containing extracted features.
- A custom Python-based YARN application launcher that packages the entire Keras/TensorFlow environment, requests GPU-enabled containers from YARN, and injects the necessary configuration for distributed training.
- A Keras training script, executed by each YARN container, that leverages TensorFlow’s
MultiWorkerMirroredStrategy
to perform data-parallel training, reading its shard of TFRecords directly from HDFS.
Stage 1: Preprocessing Audio at Scale with PySpark
Before any training can begin, raw audio must be converted into numerical features. For this task, Mel-Frequency Cepstral Coefficients (MFCCs) are a standard choice. Processing 100TB of audio serially is a non-starter. This is a classic “embarrassingly parallel” problem, perfectly suited for a Map-style operation in Spark.
The goal is to create a Spark job that reads audio file paths, distributes the processing across the YARN cluster, extracts MFCCs from each file, and writes the results out as sharded TFRecords. Using TFRecords is critical for efficient data loading in the subsequent TensorFlow training phase.
A common mistake here is to load the entire audio file into the memory of a Spark executor. For large files, this can cause OutOfMemory errors. We must process the audio in chunks. The librosa
library is excellent for this.
Here is the production-grade PySpark script. It includes configuration, proper UDF definition, and error handling for corrupt audio files.
# audio_preprocess_spark.py
import os
import io
import logging
import traceback
import librosa
import numpy as np
import tensorflow as tf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, input_file_name
from pyspark.sql.types import StringType, BinaryType, StructType, StructField, ArrayType, FloatType
# --- Configuration ---
# In a real project, this would come from a config file or YARN parameters.
APP_NAME = "AudioEmotion_Preprocessing"
HDFS_INPUT_PATH = "hdfs:///raw_audio/customer_service_calls/**/*.wav"
HDFS_OUTPUT_PATH = "hdfs:///processed_features/emotion_audio_tfrecords"
NUM_OUTPUT_SHARDS = 2048 # Should be a multiple of the number of training workers
SAMPLE_RATE = 16000
DURATION_S = 10
N_MFCC = 40
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))):
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=value))
def create_tf_example(mfccs, file_path, label):
"""
Creates a tf.train.Example proto from audio features and metadata.
"""
# In a real scenario, the label would be extracted from the file path or a metadata DB.
# Here, we'll use a placeholder label.
feature = {
'file_path': _bytes_feature(file_path.encode('utf-8')),
'label': _bytes_feature(label.encode('utf-8')),
'mfcc': _float_feature(mfccs.flatten().tolist()),
}
return tf.train.Example(features=tf.train.Features(feature=feature))
def process_audio_to_mfcc(audio_binary_content: bytes, file_path: str) -> np.ndarray:
"""
Processes a single audio file's binary content into MFCC features.
Returns None if processing fails.
"""
try:
# Use io.BytesIO to treat byte array as a file
audio_io = io.BytesIO(audio_binary_content)
# Load audio data. librosa can handle this in memory.
# For extremely large files, chunking would be needed.
waveform, sr = librosa.load(audio_io, sr=SAMPLE_RATE, mono=True)
# Pad or truncate to a fixed length
target_len = int(DURATION_S * sr)
if len(waveform) > target_len:
waveform = waveform[:target_len]
else:
waveform = np.pad(waveform, (0, target_len - len(waveform)), 'constant')
# Extract MFCCs
mfccs = librosa.feature.mfcc(y=waveform, sr=sr, n_mfcc=N_MFCC)
# Normalize
mfccs_normalized = (mfccs - np.mean(mfccs)) / np.std(mfccs)
return mfccs_normalized.tobytes()
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}\n{traceback.format_exc()}")
return None
# UDF to wrap the processing logic
process_udf = udf(process_audio_to_mfcc, BinaryType())
def main():
spark = SparkSession.builder \
.appName(APP_NAME) \
.config("spark.sql.files.maxPartitionBytes", "1g") \
.getOrCreate()
sc = spark.sparkContext
logger.info(f"Spark Session created. Version: {sc.version}")
# Read the raw audio files. 'wholeBinaryFiles' reads each file as a (path, content) pair.
# A common pitfall is using `spark.read.text` which won't work for binary data.
audio_df = spark.read.format("binaryFile") \
.option("pathGlobFilter", "*.wav") \
.option("recursiveFileLookup", "true") \
.load(HDFS_INPUT_PATH) \
.select(col("path"), col("content"))
logger.info(f"Found {audio_df.count()} audio files to process.")
# Apply the UDF to process each file. This is the distributed part.
# We add the original file path to the dataframe for traceability.
features_df = audio_df.withColumn("mfcc_bytes", process_udf(col("content"), col("path")))
# Filter out files that failed processing
valid_features_df = features_df.filter(col("mfcc_bytes").isNotNull())
logger.info(f"Successfully processed {valid_features_df.count()} files.")
# Convert the DataFrame rows into serialized TFRecord Examples.
# The map operation is another distributed transformation.
def to_tf_example_str(row):
# Placeholder for label extraction logic
label = "neutral"
mfccs = np.frombuffer(row.mfcc_bytes, dtype=np.float32).reshape(N_MFCC, -1)
tf_example = create_tf_example(mfccs, row.path, label)
return tf_example.SerializeToString()
tf_example_rdd = valid_features_df.rdd.map(to_tf_example_str)
# Save the RDD as TFRecords. This action triggers the computation.
# `repartition` helps control the number of output files (shards).
# Using a specific save format for TFRecords is crucial for Hadoop compatibility.
tf_example_rdd.repartition(NUM_OUTPUT_SHARDS) \
.saveAsNewAPIHadoopFile(
HDFS_OUTPUT_PATH,
"org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
keyClass="org.apache.hadoop.io.BytesWritable",
valueClass="org.apache.hadoop.io.BytesWritable"
)
logger.info(f"TFRecord generation complete. Output at: {HDFS_OUTPUT_PATH}")
spark.stop()
if __name__ == "__main__":
main()
To run this on the cluster, you’d submit it with spark-submit
, ensuring all dependencies (librosa
, tensorflow
, numpy
) are available to the executors, typically by shipping a conda environment.
Stage 2: Orchestrating Keras on YARN with skein
With the data prepared, the next challenge is launching the distributed Keras training job. A YARN application consists of an ApplicationMaster (AM) that negotiates resources with the ResourceManager, and worker containers that perform the actual task. skein
provides a Pythonic interface to define and launch these applications.
The launcher script has several key responsibilities:
- Environment Packaging: It must package the Python environment, including all dependencies, into a tarball that YARN can distribute to the containers.
conda-pack
is excellent for this. - Resource Specification: It must tell YARN how many workers are needed, and what resources each requires (e.g., 8 CPUs, 32GB RAM, 1 GPU).
- Cluster Configuration: It must start each worker with the correct
TF_CONFIG
environment variable. This JSON string tells each TensorFlow instance its role (e.g.,'worker'
), its index (task_id
), and the network addresses of all other workers in the cluster. This is how they find each other.
Here is the launcher script. It’s designed to be run from an edge node of the Hadoop cluster.
# train_launcher.py
import skein
import json
import logging
import time
from datetime import timedelta
# --- Configuration ---
APP_NAME = "KerasEmotionTrainer"
PYTHON_ENV_PATH = "./environment.tar.gz" # Pre-packaged conda env
WORKER_SCRIPT_PATH = "train_worker.py" # The Keras training script
HDFS_TFRECORDS_PATH = "hdfs:///processed_features/emotion_audio_tfrecords"
NUM_WORKERS = 4
WORKER_VCORES = 8
WORKER_MEMORY_MB = 32768
WORKER_GPUS = 1
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def build_service_spec(worker_addresses):
"""
Builds the YARN service specification for a single worker.
This includes the command to run, environment variables, and resource requests.
"""
# Create the TF_CONFIG object
# The launcher will fill in the 'task' part for each specific worker.
base_tf_config = {
"cluster": {
"worker": worker_addresses
},
"task": {} # To be filled per-worker
}
# Use a dictionary to define services for skein
services = {}
for i in range(NUM_WORKERS):
# Create a unique TF_CONFIG for this worker
worker_tf_config = base_tf_config.copy()
worker_tf_config["task"] = {"type": "worker", "index": i}
# The command to execute inside the YARN container
command = (
# The packaged environment has a bin/python
f"source environment/bin/activate && "
f"export TF_CONFIG='{json.dumps(worker_tf_config)}' && "
f"python {WORKER_SCRIPT_PATH} --data_path={HDFS_TFRECORDS_PATH}"
)
services[f"worker_{i}"] = skein.Service(
resources=skein.Resources(
vcores=WORKER_VCORES,
memory=f"{WORKER_MEMORY_MB} MiB",
gpus=WORKER_GPUS
),
files={
"environment": PYTHON_ENV_PATH,
WORKER_SCRIPT_PATH: WORKER_SCRIPT_PATH
},
script=command
)
return services
def main():
# Pre-computation step: Reserve network ports for workers.
# In a real-world scenario, you might use a service discovery tool or let YARN assign ports.
# For simplicity, we hardcode them here. A potential pitfall is port collision.
# Assuming the cluster nodes are accessible on these ports.
worker_addresses = [f"node-a{i+1}.cluster:{9000+i}" for i in range(NUM_WORKERS)]
# Build the Application Specification
app_spec = skein.ApplicationSpec(
name=APP_NAME,
queue="gpu_queue", # Specify a YARN queue with GPU resources
services=build_service_spec(worker_addresses)
)
# Connect to the YARN cluster
# Assumes HADOOP_CONF_DIR is set correctly.
try:
client = skein.Client()
except skein.exceptions.SkeinConfigurationError as e:
logger.error(f"Skein configuration error. Is HADOOP_CONF_DIR set? Details: {e}")
return
logger.info("Submitting YARN application...")
try:
app_client = client.submit_and_connect(app_spec)
logger.info(f"Application submitted. ID: {app_client.id}")
# Monitor the application status
while True:
status = app_client.get_status()
logger.info(f"App Status: {status['state']} | Final Status: {status['final_status']}")
if status['state'] in ('FINISHED', 'FAILED', 'KILLED'):
break
time.sleep(30)
logs = app_client.get_logs()
for container, log_content in logs.items():
logger.info(f"\n--- Logs for {container} ---\n{log_content}")
except Exception as e:
logger.error(f"An error occurred while managing the YARN application: {e}")
finally:
logger.info("Application finished.")
if __name__ == "__main__":
# Before running this script, you must package the environment:
# $ conda create -n emotion_env python=3.8 tensorflow-gpu=2.8 keras numpy -c conda-forge
# $ conda activate emotion_env
# $ conda pack -o environment.tar.gz
main()
Stage 3: The Distributed Keras Training Script
This is the code that runs inside each YARN container. Its job is to set up the distributed strategy, define the model, create an efficient HDFS data pipeline, and run the training loop. TensorFlow’s MultiWorkerMirroredStrategy
handles the complex parts of All-Reduce gradient synchronization under the hood, as long as TF_CONFIG
is set correctly.
A critical piece is the data loading pipeline. Reading from HDFS can be slow if not done correctly. The tf.data
API provides the tools to build a performant pipeline. We will use tf.data.Dataset.list_files
to find the TFRecord shards, and since each worker knows its index, it can be configured to read only a subset of the files, ensuring no data is processed twice in one epoch.
# train_worker.py
import os
import json
import logging
import argparse
import tensorflow as tf
# --- Configuration ---
HDFS_MODEL_CHECKPOINT_PATH = "hdfs:///models/emotion_classifier/checkpoints/cp-{epoch:04d}.ckpt"
HDFS_FINAL_MODEL_PATH = "hdfs:///models/emotion_classifier/final_model"
BATCH_SIZE_PER_REPLICA = 64
EPOCHS = 20
N_MFCC = 40 # Must match preprocessing
TIME_STEPS = 313 # Depends on duration and feature extraction params
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def build_model():
"""Defines the Keras model architecture."""
# A simple Conv1D + GRU model for demonstration
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(N_MFCC, TIME_STEPS)),
tf.keras.layers.Reshape((N_MFCC, TIME_STEPS, 1)),
tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
# Need to reshape to feed into RNN
tf.keras.layers.Reshape((-1, 64)), # Flatten spatial dims
tf.keras.layers.GRU(128, return_sequences=True),
tf.keras.layers.GRU(64),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(8, activation='softmax') # Assuming 8 emotion classes
])
return model
def create_dataset(data_path, global_batch_size):
"""
Creates a distributed tf.data.Dataset pipeline that reads from HDFS.
"""
# Feature description must match what was written in the Spark job
feature_description = {
'file_path': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.string),
'mfcc': tf.io.FixedLenFeature([N_MFCC * TIME_STEPS], tf.float32),
}
def _parse_function(example_proto):
parsed = tf.io.parse_single_example(example_proto, feature_description)
mfcc = tf.reshape(parsed['mfcc'], [N_MFCC, TIME_STEPS])
# This is where you would map string labels to integer indices
# For now, a dummy integer label
label_int = tf.constant(0, dtype=tf.int32)
return mfcc, label_int
# List all TFRecord files in the HDFS directory
file_pattern = os.path.join(data_path, "part-*")
dataset = tf.data.Dataset.list_files(file_pattern)
# A crucial step for distributed training: each worker processes a unique subset of data.
# The `shard` transformation handles this automatically based on the TF_CONFIG.
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
# Use interleave for parallel reading of files.
dataset = dataset.interleave(
lambda x: tf.data.TFRecordDataset(x, compression_type=''),
cycle_length=tf.data.AUTOTUNE,
num_parallel_calls=tf.data.AUTOTUNE
)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(global_batch_size)
dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
return dataset
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data_path", type=str, required=True, help="HDFS path to TFRecords")
args = parser.parse_args()
# Set up the distributed strategy
# TF_CONFIG is automatically read from the environment variable.
try:
strategy = tf.distribute.MultiWorkerMirroredStrategy()
num_workers = strategy.num_replicas_in_sync
logger.info(f"MultiWorkerMirroredStrategy initialized. Number of workers: {num_workers}")
except Exception as e:
logger.error(f"Failed to initialize distribution strategy: {e}")
raise
global_batch_size = BATCH_SIZE_PER_REPLICA * num_workers
# Create the dataset within the strategy's scope
dataset = create_dataset(args.data_path, global_batch_size)
# Build and compile the model within the strategy's scope.
# This ensures variables are mirrored across all workers.
with strategy.scope():
model = build_model()
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
logger.info("Model compiled successfully.")
model.summary()
# Callbacks should be aware of the distributed setup.
# The ModelCheckpoint callback should only save on the chief worker (index 0).
task_id = json.loads(os.environ['TF_CONFIG'])['task']['index']
callbacks = []
if task_id == 0:
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=HDFS_MODEL_CHECKPOINT_PATH,
save_weights_only=True,
verbose=1
)
callbacks.append(checkpoint_callback)
logger.info("Starting model training...")
model.fit(
dataset,
epochs=EPOCHS,
callbacks=callbacks,
verbose=2 # Verbose=1 can be messy with multiple workers logging
)
# Only the chief saves the final model
if task_id == 0:
logger.info(f"Training complete. Saving final model to {HDFS_FINAL_MODEL_PATH}")
model.save(HDFS_FINAL_MODEL_PATH)
if __name__ == '__main__':
main()
This system, while requiring more setup than an off-the-shelf solution, provides a robust and transparent pipeline for large-scale Keras training on existing Hadoop infrastructure. It directly tackles the I/O bottleneck by collocating compute and data and leverages established frameworks like Spark, YARN, and TensorFlow’s native distribution strategies in a cohesive manner.
The architecture is not without its limitations. The dependency management via conda-pack
is effective but can lead to large environment archives and requires manual updates. A more mature solution might involve using Docker containers on YARN, which provides better isolation and reproducibility, though it introduces its own operational overhead. Furthermore, this setup is designed for batch training. For online learning or near-real-time model updates, a different architecture leveraging streaming technologies like Kafka and Flink would be more appropriate. Hyperparameter tuning also remains a manual process; orchestrating a distributed hyperparameter search on top of this framework would be a logical next step, potentially using a tool like KerasTuner with a custom Tuner
subclass that can launch YARN applications.