The initial system response time was unacceptable: 45 seconds for a user-initiated parameter change to reflect in a complex geospatial visualization. The architecture was conventional. A Flutter mobile client sent a request to a Spring Boot backend. The backend would configure and launch an Apache Spark job to process terabytes of raw geospatial data, perform a series of transformations, and then aggregate the results into a final, compact JSON payload for the client to render. The bottleneck wasn’t the initial data crunching in Spark—that part was massively parallel and fast. The problem was a final, stateful aggregation stage that was computationally expensive and stubbornly serial, running on a single Spark executor and delaying the entire response.
Our first attempts at optimization focused on the Spark job itself, but the nature of the algorithm resisted further parallelization. The next logical step was to stream intermediate results to the client and let the device handle the final aggregation. This failed immediately. The intermediate dataset, while smaller than the source, was still hundreds of megabytes of raw coordinates and values. Processing this volume in Dart on the Flutter UI thread resulted in a completely frozen interface, a worse user experience than the original 45-second wait. This is a classic distributed computing problem: a workload that doesn’t fit neatly into either a pure server-side or client-side execution model. The solution required creating a hybrid compute pipeline, selectively offloading the problematic serial computation to the client but executing it outside the confines of the Dart VM for performance.
This led to a final architecture:
- Apache Spark: Performs the initial, massively parallel ETL and filtering. It no longer produces a final result but instead writes batches of intermediate data points to a durable message queue.
- Spring Boot: Acts as the orchestrator. It exposes a gRPC service, initiates the Spark job, and then acts as a proxy, streaming the intermediate results from the message queue to the client.
- gRPC: Provides the high-performance, bidirectional streaming channel between the Spring backend and the Flutter client, essential for handling the large volume of intermediate binary data efficiently.
- WebAssembly (WASM): The serial aggregation algorithm, originally written in Scala for Spark, was rewritten in Rust for its performance and safety guarantees and compiled to WASM. This is our portable, high-performance compute engine.
- Flutter: The client initiates the request. It spawns a separate Isolate to handle the incoming gRPC stream and pipes the data directly into the WASM module. This ensures the main UI thread remains responsive. The Isolate then sends the final, aggregated visualization data back to the UI for rendering.
sequenceDiagram participant Flutter UI participant Flutter Isolate participant WASM Engine participant Spring gRPC Service participant Spark Cluster Flutter UI->>+Spring gRPC Service: initiateSimulation(params) Spring gRPC Service->>+Spark Cluster: submitJob(params) Spark Cluster-->>-Spring gRPC Service: JobAccepted (JobID) Note over Spark Cluster: Parallel processing of raw data, writes intermediate chunks to a queue. Spring gRPC Service-->>-Flutter UI: streamSimulationResults() [gRPC Stream] Flutter UI->>+Flutter Isolate: spawn(gRPC_Stream_Handle) Flutter Isolate->>Spring gRPC Service: Start consuming stream loop Data Stream Spring gRPC Service-->>Flutter Isolate: IntermediateDataChunk Flutter Isolate->>+WASM Engine: processChunk(IntermediateDataChunk) WASM Engine-->>-Flutter Isolate: ProcessedPartialResult end Spring gRPC Service-->>Flutter Isolate: StreamEnd Flutter Isolate->>Flutter Isolate: Final Aggregation Flutter Isolate-->>-Flutter UI: FinalVisualizationPayload Flutter UI->>Flutter UI: Render visualization
The gRPC Contract: Defining the Data Flow
The foundation of this architecture is the Protobuf contract. A common mistake is to define a simple RPC call that returns a large list. For this to work, we must use server-side streaming. The client sends a single request, and the server replies with a stream of messages.
The simulation.proto
file defines this contract. We need a message for the request parameters, and more importantly, a message for the intermediate data chunks. In our case, these are geospatial points.
// syntax = "proto3";
// package simulation;
// option java_package = "com.example.hybridcompute.grpc";
// option java_multiple_files = true;
// The service definition for our simulation engine.
service SimulationService {
// Initiates a simulation and streams back intermediate results.
// This is a server-streaming RPC.
rpc StreamSimulationResults(SimulationRequest) returns (stream GeoDataChunk);
}
// Contains parameters to configure the Spark job.
message SimulationRequest {
string region = 1;
int64 start_timestamp = 2;
int64 end_timestamp = 3;
double sensitivity_parameter = 4;
}
// Represents a single point in our intermediate dataset.
// Using fixed-size types is crucial for performance.
message GeoPoint {
double latitude = 1;
double longitude = 2;
float value = 3; // Use float if precision allows, saves bandwidth.
}
// A chunk of intermediate data points.
// We stream chunks, not individual points, to reduce overhead.
message GeoDataChunk {
repeated GeoPoint points = 1;
int32 chunk_sequence_id = 2; // Important for ordering on the client.
bool is_last_chunk = 3; // Signal for stream termination.
}
The key design choices here are stream GeoDataChunk
in the service definition and batching points into chunks. Streaming individual points would create excessive network and serialization overhead. The is_last_chunk
boolean is a simple but effective mechanism to signal the end of the stream at the application layer, which is more explicit than relying solely on the gRPC onCompleted
event.
The Spring Boot Orchestrator
The Spring Boot application serves as the system’s brain. It needs dependencies for gRPC, Spark interaction, and potentially a message queue client like Kafka or Pulsar if the intermediate data needs to be durable.
build.gradle.kts
:
// plugins { ... }
// dependencies { ... }
// gRPC dependencies
implementation("net.devh:grpc-spring-boot-starter:2.14.0.RELEASE")
implementation("io.grpc:grpc-protobuf")
implementation("io.grpc:grpc-stub")
// Spark interaction
implementation("org.apache.spark:spark-core_2.12:3.4.1")
implementation("org.apache.spark:spark-sql_2.12:3.4.1")
// Using SparkLauncher for simplicity to submit jobs to a standalone cluster
implementation("org.apache.spark:spark-launcher_2.12:3.4.1")
// For compiling the .proto file
protobuf(files("src/main/proto/simulation.proto"))
The gRPC service implementation is where the orchestration happens. It must be non-blocking. A common pitfall is to block the gRPC thread while waiting for the Spark job to complete. Instead, we submit the job and immediately start polling the output location (e.g., a directory on HDFS or a Kafka topic) to stream results back.
// package com.example.hybridcompute.service;
import com.example.hybridcompute.grpc.*;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@GrpcService
public class SimulationServiceImpl extends SimulationServiceGrpc.SimulationServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(SimulationServiceImpl.class);
private static final int CHUNK_SIZE = 4096; // Number of points per gRPC message
@Value("${spark.home}")
private String sparkHome;
@Value("${spark.master.url}")
private String sparkMasterUrl;
@Value("${spark.app.jar.path}")
private String appJarPath;
@Value("${spark.app.main-class}")
private String mainClass;
@Value("${spark.output.base-path}")
private String outputPathBase;
@Override
public void streamSimulationResults(SimulationRequest request, StreamObserver<GeoDataChunk> responseObserver) {
String uniqueJobId = UUID.randomUUID().toString();
Path jobOutputPath = Paths.get(outputPathBase, uniqueJobId);
logger.info("Received simulation request for region {}. Job ID: {}", request.getRegion(), uniqueJobId);
try {
Files.createDirectories(jobOutputPath);
} catch (IOException e) {
logger.error("Failed to create output directory for job {}", uniqueJobId, e);
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("Failed to prepare job environment.")
.asRuntimeException());
return;
}
// We run the Spark job asynchronously to avoid blocking the gRPC thread.
// In a production system, this would be submitted to a proper job scheduler like YARN or Kubernetes.
new Thread(() -> {
try {
runSparkJob(request, jobOutputPath.toString());
streamResultsFromFile(jobOutputPath, responseObserver);
} catch (IOException | InterruptedException e) {
logger.error("Spark job execution or result streaming failed for job {}", uniqueJobId, e);
// Ensure we notify the client of the failure.
if (!isClientCancelled(responseObserver)) {
responseObserver.onError(io.grpc.Status.INTERNAL
.withDescription("Backend computation failed: " + e.getMessage())
.asRuntimeException());
}
} finally {
// Cleanup job output directory
// ... implementation omitted for brevity
}
}).start();
}
private void runSparkJob(SimulationRequest request, String outputPath) throws IOException, InterruptedException {
CountDownLatch sparkJobLatch = new CountDownLatch(1);
SparkLauncher launcher = new SparkLauncher()
.setSparkHome(sparkHome)
.setAppResource(appJarPath)
.setMainClass(mainClass)
.setMaster(sparkMasterUrl)
.setConf(SparkLauncher.DRIVER_MEMORY, "1g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
.setConf(SparkLauncher.EXECUTOR_CORES, "2")
// Pass parameters to the Spark application
.addAppArgs(
"--region", request.getRegion(),
"--start-ts", String.valueOf(request.getStartTimestamp()),
"--end-ts", String.valueOf(request.getEndTimestamp()),
"--output-path", outputPath
);
logger.info("Launching Spark job...");
Process spark = launcher.launch();
// In a real project, you'd use a more robust process management and logging solution.
new Thread(new InputStreamReaderRunnable(spark.getInputStream(), "SparkAppInfo")).start();
new Thread(new InputStreamReaderRunnable(spark.getErrorStream(), "SparkAppError")).start();
int exitCode = spark.waitFor();
sparkJobLatch.countDown();
if (exitCode != 0) {
throw new IOException("Spark job failed with exit code: " + exitCode);
}
logger.info("Spark job completed successfully.");
}
private void streamResultsFromFile(Path jobOutputPath, StreamObserver<GeoDataChunk> responseObserver) {
// This simulates reading from a file where Spark wrote its output.
// A production system would use a distributed queue.
// The output is expected to be a CSV: lat,lon,value
Path resultFile = jobOutputPath.resolve("part-00000");
logger.info("Streaming results from {}", resultFile);
try (BufferedReader reader = new BufferedReader(new FileReader(resultFile.toFile()))) {
String line;
List<GeoPoint> points = new ArrayList<>(CHUNK_SIZE);
int sequenceId = 0;
while ((line = reader.readLine()) != null) {
if (isClientCancelled(responseObserver)) {
logger.warn("Client cancelled the stream. Aborting result sending.");
return;
}
String[] parts = line.split(",");
if (parts.length == 3) {
GeoPoint point = GeoPoint.newBuilder()
.setLatitude(Double.parseDouble(parts[0]))
.setLongitude(Double.parseDouble(parts[1]))
.setValue(Float.parseFloat(parts[2]))
.build();
points.add(point);
}
if (points.size() >= CHUNK_SIZE) {
sendChunk(responseObserver, points, ++sequenceId, false);
points.clear();
}
}
// Send any remaining points in the last chunk
if (!points.isEmpty()) {
sendChunk(responseObserver, points, ++sequenceId, true);
} else {
// If the last chunk was exactly CHUNK_SIZE, send an empty final chunk.
sendChunk(responseObserver, new ArrayList<>(), ++sequenceId, true);
}
responseObserver.onCompleted();
logger.info("Finished streaming all results.");
} catch (IOException e) {
logger.error("Error reading Spark output file.", e);
if (!isClientCancelled(responseObserver)) {
responseObserver.onError(e);
}
}
}
private void sendChunk(StreamObserver<GeoDataChunk> observer, List<GeoPoint> points, int seqId, boolean isLast) {
GeoDataChunk chunk = GeoDataChunk.newBuilder()
.addAllPoints(points)
.setChunkSequenceId(seqId)
.setIsLastChunk(isLast)
.build();
observer.onNext(chunk);
}
private boolean isClientCancelled(StreamObserver<?> observer) {
// A naive check. In real gRPC, you'd handle CancellationException.
// This is a simplified check for clarity.
return ((io.grpc.stub.ServerCallStreamObserver<?>) observer).isCancelled();
}
}
This implementation uses SparkLauncher
for simplicity. In a production environment, you would likely use a more robust REST API for job submission (like Apache Livy) or a Kubernetes Operator. The key takeaway is the decoupling: the gRPC service’s responsibility is to manage the job lifecycle and stream the data, not execute the business logic itself.
The WASM Compute Module in Rust
The serial aggregation logic is a perfect candidate for Rust due to its performance, lack of a garbage collector, and excellent WASM tooling. The logic here would mirror the original Scala code from the Spark job. Let’s assume the task is to generate a heatmap grid from the raw points.
src/lib.rs
in a Rust library crate:
```rust
// use wasm_bindgen::prelude::*;
// A simplified representation of the data structures.
// In a real project, these would be generated from the .proto file
// using something like ‘prost’. For this example, we define them manually.
#[derive(Clone, Copy, Debug)]
pub struct GeoPoint {
pub latitude: f64,
pub longitude: f64,
pub value: f32,
}
// Represents the output grid cell for the heatmap.
#[derive(Clone, Copy, Debug)]
pub struct GridCell {
pub sum_values: f32,
pub point_count: u32,
}
// The main processing engine. It is stateful.
pub struct HeatmapAggregator {
grid: Vec
grid_width: usize,
grid_height: usize,
min_lat: f64,
max_lon: f64,
cell_size: f64,
}
// This is not using wasm_bindgen for simplicity, to show a pure computation core.
// The FFI boundary would be handled by the Flutter WASM runner.
impl HeatmapAggregator {
pub fn new(
grid_width: usize,
grid_height: usize,
min_lat: f64,
max_lon: f64,
cell_size: f64,
) -> Self {
Self {
grid: vec