The Spark UI reported success, the Hudi timeline showed a new commit every five minutes, and our downstream dashboards were populated. Yet, we were flying blind. When a business user reported that metrics for a specific campaign looked wrong, our investigation was a frantic scramble through disparate systems. We’d grep through application logs on our Alibaba Cloud ACK cluster, dive into the YARN logs for the Spark job on EMR, and manually inspect Hudi table partitions on OSS. The core problem was a lack of correlation. We could see that the pipeline ran, but we couldn’t trace the journey of a single business event from its origin in a Java microservice to its final resting place in a Hudi table. This disconnect made debugging data quality and latency issues an expensive, time-consuming ordeal.
Our initial concept was to treat data records like RPC calls. In a microservices world, we solve this correlation problem with distributed tracing. The goal became clear: extend distributed tracing principles into our batch and streaming data pipelines. We needed to propagate a unique trace context alongside the data itself, instrument our Spark processing logic to participate in this trace, and critically, persist this context within the data lakehouse. Furthermore, our pipeline’s operational logic—like data validation rules or sampling rates for this very observability system—was hardcoded. Changing a simple validation threshold required a full redeployment of the Spark application. This was operationally untenable. We needed a dynamic configuration system that could alter the pipeline’s behavior in real-time.
This led to an architectural decision converging on four key technologies:
- OpenTelemetry: The industry standard for observability. Its context propagation mechanisms were the foundation for our tracing solution.
- Apache Hudi: Our existing data lakehouse format. We needed to leverage its metadata capabilities to store the trace context.
- Nacos: Our choice for a dynamic configuration center. It would allow us to control the pipeline’s logic without redeployments.
- Alibaba Cloud: The underlying platform providing the compute (EMR, ACK), storage (OSS), and observability backend (SLS/Prometheus).
The path forward involved weaving these components into a cohesive system that provided record-level traceability and dynamic operational control.
Phase 1: Instrumenting the Data Source and Propagating Context
The journey begins at the source: a Spring Boot microservice running on Alibaba Cloud’s ACK (Container Service for Kubernetes). This service generates events that are published to a Kafka topic. The first critical step is to initiate a trace when an event is created and ensure its context is injected into the Kafka message.
In a real-world project, using the OpenTelemetry Java auto-instrumentation agent is often the path of least resistance. It automatically handles trace creation for incoming HTTP requests and other common framework interactions. Our focus, however, is on the outbound path: publishing to Kafka.
Here is the core logic within our EventProducerService
. We manually create a new span, inject its context into the Kafka record’s headers, and then publish it.
// File: com/example/dataprovider/service/EventProducerService.java
package com.example.dataprovider.service;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class EventProducerService {
private static final Logger logger = LoggerFactory.getLogger(EventProducerService.class);
private final KafkaTemplate<String, String> kafkaTemplate;
private final Tracer tracer;
private final OpenTelemetry openTelemetry;
@Autowired
public EventProducerService(KafkaTemplate<String, String> kafkaTemplate, OpenTelemetry openTelemetry) {
this.kafkaTemplate = kafkaTemplate;
this.openTelemetry = openTelemetry;
// It's best practice to get the tracer with a specific name and version.
this.tracer = openTelemetry.getTracer("com.example.dataprovider.producer", "1.0.0");
}
public void sendEvent(String eventPayload) {
// 1. Start a new span for the event production process.
Span producerSpan = tracer.spanBuilder("send-user-event").startSpan();
// 2. Make the span current in the context.
try (Scope scope = producerSpan.makeCurrent()) {
producerSpan.setAttribute("event.payload.size", eventPayload.length());
String eventId = UUID.randomUUID().toString();
producerSpan.setAttribute("event.id", eventId);
ProducerRecord<String, String> record = new ProducerRecord<>("user-events-topic", eventId, eventPayload);
// 3. Inject the current trace context into the Kafka record headers.
// This is the most critical step for context propagation.
openTelemetry.getPropagators().getTextMapPropagator().inject(
Context.current(), // The current context which holds our producerSpan
record.headers(), // The carrier, in this case Kafka headers
(headers, key, value) -> headers.add(new RecordHeader(key, value.getBytes(StandardCharsets.UTF_8)))
);
logger.info("Injecting trace context into Kafka headers. TraceId: {}", producerSpan.getSpanContext().getTraceId());
kafkaTemplate.send(record).addCallback(
result -> {
logger.info("Successfully sent event with id: {}", eventId);
producerSpan.setAttribute("kafka.partition", result.getRecordMetadata().partition());
producerSpan.setAttribute("kafka.offset", result.getRecordMetadata().offset());
},
ex -> {
logger.error("Failed to send event", ex);
producerSpan.recordException(ex);
producerSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, ex.getMessage());
}
);
} finally {
// 4. End the span once the operation is complete.
producerSpan.end();
}
}
}
The key piece of code is the call to openTelemetry.getPropagators().getTextMapPropagator().inject(...)
. This uses the configured propagator (typically W3C Trace Context) to serialize the current traceId
and spanId
into string key-value pairs (traceparent
, tracestate
) and adds them as headers to the Kafka message. This is how the trace context physically travels alongside the data to the next stage.
Phase 2: Dynamic Pipeline Control with Nacos
Before building the Spark consumer, we addressed the static configuration problem. Our Spark job needed to fetch properties like the OpenTelemetry collector endpoint, sampling ratio, and business-level data quality rules from a central location.
We set up a Nacos server and created a configuration with dataId
spark-hudi-pipeline.properties
in the DEFAULT_GROUP
.
# Nacos Configuration: spark-hudi-pipeline.properties
# Observability Settings
otel.exporter.otlp.endpoint=http://otel-collector.observability.svc.cluster.local:4317
otel.traces.sampler=parentbased_traceidratio
otel.traces.sampler.arg=0.2
# Data Quality Settings
data.quality.validation.enabled=true
data.quality.min.event.timestamp.epoch=1672531200
# Hudi Sink Settings
hudi.table.path=oss://my-datalake-bucket/hudi_user_events
hudi.table.name=user_events
hudi.table.type=COPY_ON_WRITE
The Spark application needs a robust way to fetch and refresh this configuration. A common mistake is to initialize the Nacos client on each executor, which is inefficient and can overwhelm the Nacos server. The correct pattern is to initialize a singleton client on the Spark driver, fetch the configuration, and then broadcast it to all executors.
Here’s a Scala object that manages the Nacos configuration, including a listener to handle dynamic updates.
// File: com/example/sparkhudi/config/NacosConfigManager.scala
package com.example.sparkhudi.config
import com.alibaba.nacos.api.NacosFactory
import com.alibaba.nacos.api.config.listener.Listener
import com.alibaba.nacos.api.config.{ConfigService, ConfigType}
import com.alibaba.nacos.api.exception.NacosException
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import java.util.Properties
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReference
object NacosConfigManager {
private val logger = LoggerFactory.getLogger(getClass)
// Use AtomicReference to hold the broadcasted properties for thread-safe updates.
private val configBroadcast: AtomicReference[Broadcast[Properties]] = new AtomicReference()
@volatile private var configService: ConfigService = _
private val NACOS_SERVER_ADDR = "nacos-server.prod.internal:8848"
private val NACOS_DATA_ID = "spark-hudi-pipeline.properties"
private val NACOS_GROUP = "DEFAULT_GROUP"
private val NACOS_TIMEOUT = 5000L
def initialize(spark: SparkSession): Unit = {
if (configService == null) {
synchronized {
if (configService == null) {
try {
val properties = new Properties()
properties.put("serverAddr", NACOS_SERVER_ADDR)
// It is recommended to specify the namespace in a production environment.
// properties.put("namespace", "your-namespace-id")
configService = NacosFactory.createConfigService(properties)
val initialConfigStr = configService.getConfig(NACOS_DATA_ID, NACOS_GROUP, NACOS_TIMEOUT)
updateAndBroadcastConfig(spark, initialConfigStr)
// Add a listener to handle dynamic configuration changes.
configService.addListener(NACOS_DATA_ID, NACOS_GROUP, new Listener {
override def getExecutor: Executor = null // Use default executor
override def receiveConfigInfo(configInfo: String): Unit = {
logger.warn("Nacos configuration updated. Re-broadcasting to executors.")
updateAndBroadcastConfig(spark, configInfo)
}
})
logger.info("NacosConfigManager initialized and listener registered.")
} catch {
case e: NacosException =>
logger.error("Failed to initialize Nacos ConfigService", e)
throw new RuntimeException("Cannot start Spark job without Nacos config", e)
}
}
}
}
}
private def updateAndBroadcastConfig(spark: SparkSession, configStr: String): Unit = {
val props = new Properties()
if(configStr != null && !configStr.isEmpty) {
props.load(new java.io.StringReader(configStr))
}
// Before broadcasting new properties, unpersist the old one to free up memory.
val oldBroadcast = configBroadcast.get()
if (oldBroadcast != null) {
oldBroadcast.unpersist()
}
// Broadcast the new properties.
val newBroadcast = spark.sparkContext.broadcast(props)
configBroadcast.set(newBroadcast)
logger.info(s"Successfully broadcasted new configuration with ${props.size()} properties.")
}
// Executors will call this method to get the latest configuration.
def getConfig: Properties = {
val broadcast = configBroadcast.get()
if (broadcast == null) {
throw new IllegalStateException("NacosConfigManager has not been initialized on the driver.")
}
broadcast.value
}
}
This manager is initialized once on the driver at the start of the Spark job. The updateAndBroadcastConfig
method is key; it ensures that when a config is changed in Nacos, the listener on the driver picks it up, creates a new broadcast variable, and replaces the old one. This makes the new configuration available to all tasks on the executors.
Phase 3: The Trace-Aware Spark-Hudi Job
This is the heart of the solution. The Spark job must:
- Read from Kafka.
- Extract the OpenTelemetry context from each message’s headers.
- Continue the trace by creating new spans for the processing logic.
- Use the dynamically loaded Nacos configuration.
- Add the
trace_id
to the data itself. - Write the data to Hudi, injecting trace metadata into the Hudi commit.
A major pitfall in instrumenting Spark is that OpenTelemetry’s context is typically stored in a ThreadLocal
, which does not survive the serialization and network transfer between Spark’s driver and executors, or between stages. We must handle context propagation manually.
Below is the structured Spark application.
// File: com/example/sparkhudi/pipeline/TraceableHudiPipeline.scala
package com.example.sparkhudi.pipeline
import com.example.sparkhudi.config.NacosConfigManager
import com.example.sparkhudi.observability.OtelSparkHelper
import io.opentelemetry.context.Context
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import java.util.Properties
object TraceableHudiPipeline {
private val logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("TraceableHudiPipeline")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Register custom classes for Kryo if needed
.getOrCreate()
// Initialize Nacos and broadcast the initial configuration.
NacosConfigManager.initialize(spark)
// Initialize OpenTelemetry on the driver.
// The configuration for the exporter endpoint comes from Nacos.
val driverConfig = NacosConfigManager.getConfig
OtelSparkHelper.initialize(driverConfig.getProperty("otel.exporter.otlp.endpoint"))
val kafkaSourceDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker.prod.internal:9092")
.option("subscribe", "user-events-topic")
.option("startingOffsets", "latest")
.load()
val query = kafkaSourceDF.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
logger.info(s"Processing batch ID: $batchId")
if (!batchDF.isEmpty) {
processBatch(spark, batchDF, batchId)
}
}
.start()
query.awaitTermination()
}
def processBatch(spark: SparkSession, df: DataFrame, batchId: Long): Unit = {
// Each batch is processed inside a new driver-side span.
val tracer = OtelSparkHelper.getTracer
val batchSpan = tracer.spanBuilder("process-kafka-batch").startSpan()
batchSpan.setAttribute("spark.batch.id", batchId)
try {
// The configuration is accessed on the driver to be included in the trace.
val config = NacosConfigManager.getConfig
batchSpan.setAttribute("config.hudi.table.path", config.getProperty("hudi.table.path"))
batchSpan.setAttribute("config.data.quality.validation.enabled", config.getProperty("data.quality.validation.enabled"))
val processedDF = df
.select(
col("key").cast("string"),
col("value").cast("string"),
// The `headers` column contains our trace context.
col("headers")
)
.repartition(16) // Example of a transformation
.mapPartitions { iterator =>
// This code runs on each executor.
// Initialize OTEL once per executor task.
OtelSparkHelper.initialize(NacosConfigManager.getConfig.getProperty("otel.exporter.otlp.endpoint"))
val localTracer = OtelSparkHelper.getTracer
iterator.map { row =>
val headers = row.getSeq[org.apache.spark.sql.Row](2)
val headerMap = headers.map(h => h.getString(0) -> new String(h.get(1).asInstanceOf[Array[Byte]])).toMap
// 1. Manually extract the context from Kafka headers.
val parentContext = OtelSparkHelper.extractContextFromHeaders(headerMap)
// 2. Create a new span for processing this single record, linked to the parent.
val recordSpan = localTracer.spanBuilder("process-single-record")
.setParent(parentContext)
.startSpan()
val traceId = recordSpan.getSpanContext.getTraceId
try {
// Simulate some work and apply validation based on Nacos config.
val payload = row.getString(1)
recordSpan.setAttribute("payload.length", payload.length)
if (NacosConfigManager.getConfig.getProperty("data.quality.validation.enabled", "false").toBoolean) {
// Example validation logic...
if (payload.contains("invalid")) {
throw new RuntimeException("Invalid payload detected")
}
}
recordSpan.setStatus(io.opentelemetry.api.trace.StatusCode.OK)
// Return the processed data along with the trace_id
(row.getString(0), payload, traceId)
} catch {
case e: Exception =>
recordSpan.recordException(e)
recordSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getMessage)
// In a real scenario, you might send this to a dead-letter queue.
// For now, we filter it out.
null
} finally {
recordSpan.end()
}
}
}
.filter(_ != null) // Filter out records that failed processing
.toDF("event_id", "payload", "_trace_id") // Add trace_id as a new column
// Here you would parse the JSON payload into more columns.
// For brevity, we'll keep it simple.
.withColumn("ts", current_timestamp())
.withColumn("partition_path", date_format(col("ts"), "yyyy/MM/dd"))
batchSpan.setAttribute("record.count.processed", processedDF.count())
writeToHudi(processedDF, config, traceId = batchSpan.getSpanContext.getTraceId)
} catch {
case e: Exception =>
logger.error(s"Error processing batch $batchId", e)
batchSpan.recordException(e)
batchSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getMessage)
throw e
} finally {
batchSpan.end()
}
}
def writeToHudi(df: DataFrame, config: Properties, traceId: String): Unit = {
val hudiOptions = Map[String, String](
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "event_id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path",
HoodieWriteConfig.TABLE_NAME.key -> config.getProperty("hudi.table.name"),
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key -> config.getProperty("hudi.table.type"),
// This is crucial: Inject metadata into the Hudi commit.
"hoodie.commit.metadata.key.prefix" -> "trace.",
"trace.batch_trace_id" -> traceId,
"trace.source" -> "TraceableHudiPipeline"
)
df.write
.format("hudi")
.options(hudiOptions)
.mode(SaveMode.Append)
.save(config.getProperty("hudi.table.path"))
}
}
The OtelSparkHelper
object abstracts the complexity of initializing OpenTelemetry and handling context propagation.
// File: com/example/sparkhudi/observability/OtelSparkHelper.scala
package com.example.sparkhudi.observability
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import scala.collection.JavaConverters._
object OtelSparkHelper {
@transient private var openTelemetry: OpenTelemetry = _
@transient private var tracer: Tracer = _
def initialize(endpoint: String): Unit = {
if (openTelemetry == null) {
synchronized {
if (openTelemetry == null) {
val spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(endpoint)
.build()
val spanProcessor = BatchSpanProcessor.builder(spanExporter).build()
val tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.build()
openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.buildAndRegisterGlobal()
tracer = openTelemetry.getTracer("com.example.sparkhudi.pipeline", "1.0.0")
}
}
}
}
def getTracer: Tracer = {
if (tracer == null) throw new IllegalStateException("OtelSparkHelper not initialized.")
tracer
}
// Define a TextMapGetter for Scala's Map[String, String]
private val getter: TextMapGetter[Map[String, String]] = new TextMapGetter[Map[String, String]] {
override def keys(carrier: Map[String, String]): java.lang.Iterable[String] = carrier.keys.asJava
override def get(carrier: Map[String, String], key: String): String = carrier.getOrElse(key, null)
}
def extractContextFromHeaders(headers: Map[String, String]): Context = {
if (openTelemetry == null) {
// This can happen on an executor before initialization.
return Context.root()
}
openTelemetry.getPropagators.getTextMapPropagator.extract(Context.root(), headers, getter)
}
}
This implementation achieves two critical traceability goals:
- Record-level Column: The
_trace_id
column is now part of our Hudi table schema. We can directly query the table for a specific trace.SELECT * FROM user_events WHERE _trace_id = '...'
. - Batch-level Metadata: By setting
hoodie.commit.metadata.key.prefix
, we add our batch-level trace information (trace.batch_trace_id
) directly into the.hoodie
metadata directory for each commit. This allows us to understand the context of an entire write operation without querying the data itself.
Phase 4: Validating the End-to-End Trace
With the pipeline running, the final step is to validate the result. A user reports an issue with an event payload. Our first step is to query the Hudi table on Alibaba Cloud EMR using Spark SQL or Presto/Trino.
SELECT event_id, ts, _trace_id
FROM user_events
WHERE payload LIKE '%problematic_data%'
LIMIT 1;
This query returns the _trace_id
for the problematic record. We then take this ID and query our tracing backend (e.g., Jaeger, or Alibaba Cloud SLS which supports OpenTelemetry traces). The result is a complete, end-to-end flame graph showing the entire lifecycle of that single piece of data.
graph TD A[HTTP POST /event] --> B{Producer Service Span}; B --> C{Kafka Header Injection}; subgraph Kafka C --> D[user-events-topic]; end subgraph Spark on EMR D --> E{Spark Batch Span}; E --> F{Record Processing Span 1}; E --> G{Record Processing Span 2}; E --> H[...]; G --> I{Hudi Write}; end subgraph Hudi on OSS I --> J[Commit: 20231027113500]; end style B fill:#f9f,stroke:#333,stroke-width:2px style E fill:#ccf,stroke:#333,stroke-width:2px style G fill:#ccf,stroke:#333,stroke-width:2px
This visualization instantly reveals bottlenecks, errors, and the exact path taken. If Record Processing Span 2
shows high latency, we know the issue is within our Spark transformation logic for that specific record. If the time gap between Producer Service Span
and Spark Batch Span
is large, we have a Kafka lag issue. This level of granular insight was previously impossible.
The solution isn’t without its own set of trade-offs and remaining challenges. Adding a _trace_id
column to every record introduces storage overhead, which can be significant for tables with trillions of rows. A potential optimization could involve storing this mapping in a separate, highly-compressed index rather than in the main table. Furthermore, manual context propagation within Spark is complex and somewhat brittle. As the OpenTelemetry Spark instrumentation project matures, we expect to replace our manual mapPartitions
logic with a more native, agent-based approach. Lastly, our current dynamic configuration is pull-based with a listener; for near-instantaneous updates, a push-based mechanism might be explored, though this would increase system complexity.