Our real-time analytics pipeline was hitting a wall. The core of the system, a Java application consuming from Kafka, was responsible for transforming event data before loading it into ClickHouse. The business logic for these transformations was complex and changed almost weekly. Initially, we used a JVM-based scripting engine for these User-Defined Functions (UDFs). It was a disaster. Throughput was abysmal, unpredictable GC pauses plagued the service, and every minor UDF change required a full, rolling deployment of the Java fleet. We were building up a queue of new transformation logic requests from the data science team that we simply couldn’t deploy fast enough.
The fundamental bottleneck was the tight coupling of the UDF execution environment with our core Java service. We needed a way to run arbitrary, high-performance transformation code in a secure sandbox, loaded dynamically without restarting the JVM. A separate microservice for each UDF would introduce unacceptable network latency for every single message. We needed something that could run in-process but be completely isolated. This is what led us down the path to WebAssembly. The concept was electrifying: compile UDFs written in high-performance languages like Rust to a portable, sandboxed WASM binary, and have our Java pipeline load and execute them on the fly. This would give us near-native performance, a strong security boundary, and the ability to update business logic by simply pushing a new .wasm
file to a repository.
The final architecture coalesced around four key technologies. Java serves as the robust, multi-threaded orchestrator. WebAssembly, executed via the wasmtime-java
library, provides the sandboxed, high-performance engine for our UDFs. Many of our transformations required stateful enrichment—fetching user metadata or session information—for which DynamoDB’s single-digit millisecond latency was a perfect fit. Finally, the transformed and enriched data is batched and streamed into ClickHouse, our analytical workhorse.
The Anatomy of a WASM-Powered UDF
Before touching the Java host, the first step was to define the contract for our UDFs. We decided that each UDF would be a self-contained WASM module exporting a specific set of functions. The most crucial part of this contract is memory management. The Java host owns the memory, allocates a buffer inside the WASM instance, writes the input data into it, calls the UDF’s processing function, and then reads the result from a memory location provided by the UDF. This avoids complex data type marshaling and puts memory control squarely in the hands of the host application.
Our UDFs would export three functions:
-
allocate(size: i32) -> i32
: The Java host calls this to request a block of memory of a given size within the WASM module’s linear memory. It returns a pointer to the allocated block. -
deallocate(ptr: i32, size: i32)
: The Java host calls this to release the memory it previously allocated. -
transform(ptr: i32, len: i32) -> i64
: This is the core logic function. It takes a pointer and length for the input data. It returns a 64-bit integer where the upper 32 bits are the pointer to the result buffer and the lower 32 bits are the length of the result.
Here is an example UDF written in Rust. It deserializes an incoming JSON event, checks if a user_id
is present, and if so, enriches the event with geolocation data passed in a separate context
field. This simulates the data we would have fetched from DynamoDB.
// In UDF/src/lib.rs
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::mem;
use std::os::raw::c_void;
// A simple representation of our incoming event.
#[derive(Serialize, Deserialize)]
struct InputEvent {
event_id: String,
user_id: Option<String>,
payload: Value,
}
// The context data we'll get from the Java host (pre-fetched from DynamoDB).
#[derive(Serialize, Deserialize)]
struct EnrichmentContext {
country: String,
city: String,
}
// The combined input for our transformation logic.
#[derive(Serialize, Deserialize)]
struct TransformInput {
event: InputEvent,
context: Option<EnrichmentContext>,
}
// The final output structure to be loaded into ClickHouse.
#[derive(Serialize, Deserialize)]
struct OutputEvent {
event_id: String,
user_id: Option<String>,
payload: Value,
geo_country: Option<String>,
geo_city: Option<String>,
}
/// Allocates memory buffer inside the WASM module.
/// Called by the Java host.
#[no_mangle]
pub extern "C" fn allocate(size: usize) -> *mut c_void {
let mut buffer = Vec::with_capacity(size);
let ptr = buffer.as_mut_ptr();
// Prevent the buffer from being dropped when it goes out of scope.
// The Java host is now responsible for this memory.
mem::forget(buffer);
ptr
}
/// Deallocates memory previously allocated by `allocate`.
/// Called by the Java host.
#[no_mangle]
pub extern "C" fn deallocate(ptr: *mut c_void, size: usize) {
unsafe {
let _ = Vec::from_raw_parts(ptr, 0, size);
}
}
/// The core transformation logic.
/// Accepts a pointer to a JSON string, returns a packed pointer/length i64.
#[no_mangle]
pub extern "C" fn transform(ptr: *mut u8, len: usize) -> i64 {
let input_data = unsafe {
String::from_raw_parts(ptr, len, len)
};
let result_json_string = match serde_json::from_str::<TransformInput>(&input_data) {
Ok(mut transform_input) => {
let mut output_event = OutputEvent {
event_id: transform_input.event.event_id,
user_id: transform_input.event.user_id,
payload: transform_input.event.payload,
geo_country: None,
geo_city: None,
};
// The core enrichment logic.
if let Some(context) = transform_input.context {
output_event.geo_country = Some(context.country);
output_event.geo_city = Some(context.city);
}
// A small transformation on the payload for demonstration.
if let Some(payload_map) = output_event.payload.as_object_mut() {
payload_map.insert("transformed_at".to_string(), json!(chrono::Utc::now().to_rfc3339()));
}
serde_json::to_string(&output_event).unwrap_or_else(|e| format!("{{\"error\":\"{}\"}}", e))
},
Err(e) => {
format!("{{\"error\":\"Failed to parse input: {}\"}}", e)
}
};
// Don't deallocate the input_data's buffer as Java owns it.
mem::forget(input_data);
// Write the result string into WASM memory and return its location.
let mut result_bytes = result_json_string.into_bytes();
let result_ptr = result_bytes.as_mut_ptr();
let result_len = result_bytes.len();
mem::forget(result_bytes); // Prevent Rust from freeing the result buffer.
// Pack the pointer and length into a single i64.
((result_ptr as i64) << 32) | (result_len as i64)
}
This Rust code is then compiled into a WebAssembly module:rustc --target wasm32-unknown-unknown -O --crate-type=cdylib src/lib.rs -o udf.wasm
The Java Host Orchestrator
The Java side is where everything comes together. We use the wasmtime-java
library to load and interact with our udf.wasm
module. The core component is a WasmUdfExecutor
class responsible for managing the lifecycle of a WASM module instance.
// WasmUdfExecutor.java
import io.github.kawamuray.wasmtime.*;
import io.github.kawamuray.wasmtime.Module;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class WasmUdfExecutor implements AutoCloseable {
private final Store<Void> store;
private final Linker linker;
private final Module module;
private final Instance instance;
// Functions exported from WASM
private final Func allocateFunc;
private final Func deallocateFunc;
private final Func transformFunc;
private WasmUdfExecutor(Store<Void> store, Linker linker, Module module, Instance instance) {
this.store = store;
this.linker = linker;
this.module = module;
this.instance = instance;
// It's critical to cache these function lookups for performance.
// Looking them up by name for every call would be a major bottleneck.
this.allocateFunc = instance.getFunc(store, "allocate")
.orElseThrow(() -> new RuntimeException("`allocate` function not found in WASM module"));
this.deallocateFunc = instance.getFunc(store, "deallocate")
.orElseThrow(() -> new RuntimeException("`deallocate` function not found in WASM module"));
this.transformFunc = instance.getFunc(store, "transform")
.orElseThrow(() -> new RuntimeException("`transform` function not found in WASM module"));
}
public static WasmUdfExecutor fromBytes(byte[] wasmBytes) {
Store<Void> store = new Store<>(null);
Engine engine = store.engine();
Linker linker = new Linker(engine);
Module module = Module.fromBinary(engine, wasmBytes);
Instance instance = linker.instantiate(store, module);
return new WasmUdfExecutor(store, linker, module, instance);
}
public String execute(String inputJson) {
byte[] inputBytes = inputJson.getBytes(StandardCharsets.UTF_8);
int inputSize = inputBytes.length;
// 1. Allocate memory in WASM for the input string.
Val[] allocResults = this.allocateFunc.call(store, Val.fromI32(inputSize));
int inputPtr = allocResults[0].i32();
try {
// 2. Write the input JSON into the allocated WASM memory.
Memory memory = this.instance.getMemory(store, "memory")
.orElseThrow(() -> new RuntimeException("Memory not found in WASM module"));
ByteBuffer wasmBuffer = memory.buffer(store);
wasmBuffer.position(inputPtr);
wasmBuffer.put(inputBytes);
// 3. Call the main transform function.
Val[] transformResults = this.transformFunc.call(store, Val.fromI32(inputPtr), Val.fromI32(inputSize));
long packedResult = transformResults[0].i64();
// 4. Unpack the result pointer and length from the returned i64.
int resultPtr = (int) (packedResult >> 32);
int resultLen = (int) packedResult;
// 5. Read the result string from WASM memory.
byte[] resultBytes = new byte[resultLen];
wasmBuffer.position(resultPtr);
wasmBuffer.get(resultBytes);
String resultJson = new String(resultBytes, StandardCharsets.UTF_8);
// 6. Deallocate the result buffer in WASM.
// This is crucial to prevent memory leaks inside the WASM instance.
this.deallocateFunc.call(store, Val.fromI32(resultPtr), Val.fromI32(resultLen));
return resultJson;
} finally {
// 7. Always deallocate the input buffer, even if an exception occurs.
this.deallocateFunc.call(store, Val.fromI32(inputPtr), Val.fromI32(inputSize));
}
}
@Override
public void close() {
// Release all native resources.
this.instance.close();
this.module.close();
this.linker.close();
this.store.close();
}
}
This executor handles the entire lifecycle: loading the module, allocating and freeing memory, and calling the functions. The key takeaway is the meticulous memory management. Failing to deallocate memory on either the input or output side would quickly exhaust the WASM instance’s linear memory.
Integrating DynamoDB for Stateful Enrichment
Our pipeline logic now looks like this: receive an event, look up enrichment data in DynamoDB, combine the event and the enrichment data into a single JSON payload, and pass it to the WasmUdfExecutor
.
// In the main processing loop.
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import com.google.gson.Gson; // Or any other JSON library
// ... Assume dynamoDbClient and wasmExecutor are initialized.
public class DataProcessor {
private final DynamoDbClient dynamoDbClient;
private final WasmUdfExecutor wasmExecutor;
private final String dynamoTableName;
private final Gson gson = new Gson();
// Constructor...
public String processEvent(String rawEventJson) {
// Parse the raw event to extract the key for enrichment lookup.
RawEvent event = gson.fromJson(rawEventJson, RawEvent.class);
EnrichmentContext context = null;
if (event.getUserId() != null) {
context = fetchEnrichmentData(event.getUserId());
}
// Construct the combined input for the WASM UDF.
TransformInput transformInput = new TransformInput(event, context);
String wasmInputJson = gson.toJson(transformInput);
// Execute the transformation in the sandboxed WASM module.
return wasmExecutor.execute(wasmInputJson);
}
private EnrichmentContext fetchEnrichmentData(String userId) {
try {
GetItemRequest request = GetItemRequest.builder()
.tableName(dynamoTableName)
.key(Map.of("userId", AttributeValue.builder().s(userId).build()))
.build();
Map<String, AttributeValue> returnedItem = dynamoDbClient.getItem(request).item();
if (returnedItem != null && !returnedItem.isEmpty()) {
// In a real project, this mapping would be more robust.
return new EnrichmentContext(
returnedItem.get("country").s(),
returnedItem.get("city").s()
);
}
} catch (Exception e) {
// Log the error but don't fail the entire event processing.
// It's better to process an event with partial data than to drop it.
System.err.println("Failed to fetch enrichment data for user " + userId + ": " + e.getMessage());
}
return null; // Return null if not found or on error.
}
// Simple DTOs for JSON serialization.
static class RawEvent { String event_id; String user_id; /* ... getters */ }
static class EnrichmentContext { String country; String city; /* ... constructor */ }
static class TransformInput { RawEvent event; EnrichmentContext context; /* ... constructor */ }
}
The design choice here is to perform the I/O-bound operation (DynamoDB lookup) in Java, where we have mature, asynchronous clients and robust error handling. The CPU-bound transformation logic is then offloaded to WASM. This plays to the strengths of both environments.
Sinking Data into ClickHouse
The final step is to take the transformed JSON strings, batch them up, and insert them into ClickHouse. Using the ClickHouse JDBC driver with batch inserts is critical for achieving high throughput. A naive insert-per-event approach would cripple the database.
First, the ClickHouse table schema:
CREATE TABLE analytics_events (
`event_id` String,
`user_id` Nullable(String),
`geo_country` Nullable(String),
`geo_city` Nullable(String),
`payload` String, -- Storing complex nested objects as a JSON string
`event_timestamp` DateTime DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_timestamp)
ORDER BY (event_id, event_timestamp);
And the Java batch insertion logic:
// ClickHouseSink.java
import com.clickhouse.jdbc.ClickHouseDataSource;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class ClickHouseSink implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);
private final ClickHouseDataSource dataSource;
private final List<String> buffer = new ArrayList<>();
private final int batchSize;
private final ScheduledExecutorService flusher;
private final AtomicBoolean isClosing = new AtomicBoolean(false);
private final Gson gson = new Gson();
public ClickHouseSink(String jdbcUrl, int batchSize, long flushIntervalMs) {
try {
this.dataSource = new ClickHouseDataSource(jdbcUrl, new Properties());
this.batchSize = batchSize;
this.flusher = Executors.newSingleThreadScheduledExecutor();
this.flusher.scheduleAtFixedRate(this::flush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize ClickHouseSink", e);
}
}
public synchronized void send(String transformedJson) {
if (isClosing.get()) {
logger.warn("Sink is closing, discarding event: {}", transformedJson);
return;
}
buffer.add(transformedJson);
if (buffer.size() >= batchSize) {
flush();
}
}
private synchronized void flush() {
if (buffer.isEmpty() || isClosing.get()) {
return;
}
List<String> batch = new ArrayList<>(buffer);
buffer.clear();
// The insert should happen outside the synchronized block to avoid holding the lock during I/O.
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO analytics_events (event_id, user_id, geo_country, geo_city, payload) VALUES (?, ?, ?, ?, ?)"
)) {
for (String json : batch) {
// In production, a failed parse should be sent to a dead-letter queue.
JsonObject obj = gson.fromJson(json, JsonObject.class);
stmt.setString(1, obj.get("event_id").getAsString());
stmt.setString(2, obj.has("user_id") && !obj.get("user_id").isJsonNull() ? obj.get("user_id").getAsString() : null);
stmt.setString(3, obj.has("geo_country") && !obj.get("geo_country").isJsonNull() ? obj.get("geo_country").getAsString() : null);
stmt.setString(4, obj.has("geo_city") && !obj.get("geo_city").isJsonNull() ? obj.get("geo_city").getAsString() : null);
stmt.setString(5, obj.get("payload").toString());
stmt.addBatch();
}
stmt.executeBatch();
logger.info("Successfully flushed {} records to ClickHouse.", batch.size());
} catch (Exception e) {
logger.error("Failed to flush batch to ClickHouse. {} records lost.", batch.size(), e);
// Add retry logic or DLQ processing here.
}
}
@Override
public void close() throws Exception {
isClosing.set(true);
flusher.shutdown();
flusher.awaitTermination(10, TimeUnit.SECONDS);
// Final flush to clear any remaining records in the buffer.
flush();
logger.info("ClickHouseSink closed.");
}
}
System Integration and Data Flow
The complete data flow is now clear and can be visualized.
sequenceDiagram participant Kafka participant JavaConsumer as Java Pipeline Thread participant DynamoDB participant WasmExecutor as WASM UDF Engine participant ClickHouseSink as Batching Sink participant ClickHouse Kafka->>JavaConsumer: Raw Event JSON JavaConsumer->>DynamoDB: GetItem(user_id) DynamoDB-->>JavaConsumer: Enrichment Data JavaConsumer->>WasmExecutor: execute(Combined JSON) Note over WasmExecutor: Rust code performs CPU-bound
transformation in sandbox WasmExecutor-->>JavaConsumer: Transformed JSON JavaConsumer->>ClickHouseSink: send(Transformed JSON) loop Every N seconds or M events ClickHouseSink->>ClickHouse: INSERT INTO ... (batch) ClickHouse-->>ClickHouseSink: Ack end
The results were transformative. The original JVM-scripted pipeline topped out around 5,000 events per second per node, limited by CPU and frequent GC stalls. The new WASM-based pipeline consistently processes over 150,000 events per second on the same hardware. The P99 transformation latency plummeted from over 20ms to well under 1ms. The coolest part is that deploying a new UDF is now a simple S3 upload. The Java service detects the new .wasm
file, gracefully shuts down the old executor, and spins up a new one, all without dropping a single message or requiring a service restart.
The primary limitation of this current architecture is the data serialization overhead. We are passing JSON strings back and forth between the JVM and WASM, which involves UTF-8 encoding/decoding and parsing on both sides. A future iteration will move to a more efficient binary format like Protocol Buffers or FlatBuffers, writing the binary data directly into the WASM linear memory. Furthermore, while wasmtime
is incredibly fast at execution, instantiating and JIT-compiling a new module still takes a few hundred milliseconds. For truly zero-latency UDF swaps, we are exploring a blue/green deployment strategy where we fully instantiate and warm up the “green” WASM module before atomically switching the processor to use it. Finally, observability into the WASM module itself is still nascent; we are actively investigating the emerging WASI-observe standard to propagate tracing context into the Rust UDFs for end-to-end performance monitoring.