Our interactive analysis platform faced a critical bottleneck. Users on our native SwiftUI client were triggering complex, long-running financial computations that were handled by a monolithic Python backend. The synchronous nature of these requests led to frozen UIs and frequent timeouts. Initial attempts to scale by simply adding more monolith instances were costly and ineffective due to state management issues. More importantly, when a job failed, tracing its path from a user’s tap on an iPad in New York to a specific Python worker process in our Virginia data center was an exercise in pure guesswork, stitching together disconnected logs from different systems.
The core pain point was clear: we needed a non-blocking, scalable, and fully observable architecture for our computational jobs. This meant decomposing the system and selecting the right tool for each part of the problem, even if it meant managing a heterogeneous stack.
Our redesign settled on a multi-language, micro-services-oriented approach. For the client, SwiftUI remained the right choice for its native performance. The orchestration layer, responsible for accepting requests and managing job lifecycles, needed to be extremely fast and reliable; we chose Axum (Rust) for its performance and type-safety guarantees. The heavy-lifting computational work would remain in Python, where our quant analysts live, but would be parallelized using Dask. The communication channel between the Axum orchestrator and the Dask workers required a fast, simple message broker; Redis was the pragmatic choice, serving as both a job queue and a results store. Finally, to solve the observability crisis, Fluentd was selected as a universal logging layer to aggregate, parse, and unify log streams from Swift, Rust, and Python into a single, traceable narrative for each job.
The initial architectural concept looked like this:
sequenceDiagram participant SwiftUI_Client as SwiftUI Client participant Axum_API as Axum API (Rust) participant Redis participant Dask_Workers as Dask Workers (Python) participant Fluentd SwiftUI_Client ->>+ Axum_API: POST /submit (payload, trace_id) Note over Axum_API: Generates job_id Axum_API ->> Axum_API: Log: "Job submitted" (trace_id) Axum_API -->> Fluentd: [Rust Log] Axum_API ->>+ Redis: RPUSH task_queue (job_id, payload, trace_id) Redis -->>- Axum_API: OK Axum_API -->>- SwiftUI_Client: { "job_id": "xyz" } loop Poll for result SwiftUI_Client ->>+ Axum_API: GET /status/xyz Axum_API ->>+ Redis: HGET results:xyz data Redis -->>- Axum_API: Result or NIL Axum_API -->>- SwiftUI_Client: { "status": "...", "result": "..."} end Dask_Workers ->>+ Redis: BLPOP task_queue Redis -->>- Dask_Workers: (job_id, payload, trace_id) Note over Dask_Workers: Begin Computation Dask_Workers ->> Dask_Workers: Log: "Starting work" (trace_id) Dask_Workers -->> Fluentd: [Python Log] Dask_Workers ->>+ Redis: HSET results:xyz { "status": "complete", ... } Redis -->>- Dask_Workers: OK Note over Dask_Workers: Computation Finished Dask_Workers ->> Dask_Workers: Log: "Finished work" (trace_id) Dask_Workers -->> Fluentd: [Python Log]
This design decouples the user-facing API from the computational backend, immediately solving the UI freezing issue. The key to making it production-ready, however, lies in the implementation details, particularly in robust error handling and the rigorous enforcement of a shared trace_id
across every component.
The Axum Orchestration Layer in Rust
The Axum service is the central nervous system. Its responsibilities are minimal but critical: validate incoming requests, enqueue jobs into Redis, provide a status-check endpoint, and, most importantly, produce high-quality, structured JSON logs.
Here’s the core server implementation. A common mistake in production services is not managing state and connections properly. We use Arc
for shared state (like the Redis connection pool) and a dedicated module for our application-specific error handling.
main.rs
:
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use redis::Commands;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::net::SocketAddr;
use tokio;
use tower_http::trace::TraceLayer;
use tracing::{error, info, instrument, Level};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
// Shared application state, including the Redis connection pool.
#[derive(Clone)]
struct AppState {
redis_pool: Arc<r2d2::Pool<redis::Client>>,
}
// Request body for submitting a new job.
#[derive(Deserialize)]
struct JobRequest {
// In a real project, this would be a more complex struct.
computation_params: String,
trace_id: String,
}
// Response after submitting a job.
#[derive(Serialize)]
struct JobResponse {
job_id: String,
}
// Response for a status check.
#[derive(Serialize, Clone)]
struct JobStatus {
status: String,
result: Option<String>,
}
#[tokio::main]
async fn main() {
// Setup structured logging (JSON format) for Fluentd to parse.
tracing_subscriber::registry()
.with(fmt::layer().json())
.with(tracing_subscriber::filter::LevelFilter::from_level(Level::INFO))
.init();
// In a real-world project, this URL comes from configuration.
let redis_client = redis::Client::open("redis://127.0.0.1:6379").expect("Invalid Redis URL");
let pool = r2d2::Pool::builder()
.build(redis_client)
.expect("Failed to create Redis pool");
let app_state = AppState {
redis_pool: Arc::new(pool),
};
let app = Router::new()
.route("/submit", post(submit_job))
.route("/status/:job_id", get(get_job_status))
.with_state(app_state)
// Add a tracing layer for HTTP request logging.
.layer(TraceLayer::new_for_http());
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
#[instrument(skip(state), fields(trace_id = %payload.trace_id, job_id))]
async fn submit_job(
State(state): State<AppState>,
Json(payload): Json<JobRequest>,
) -> Result<Json<JobResponse>, (StatusCode, String)> {
let job_id = Uuid::new_v4().to_string();
tracing::Span::current().record("job_id", &job_id.as_str());
info!("Received new job submission");
let task_payload = serde_json::to_string(&serde_json::json!({
"job_id": job_id,
"trace_id": payload.trace_id,
"params": payload.computation_params,
}))
.map_err(|e| {
error!("Failed to serialize task payload: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error".to_string())
})?;
let mut conn = state.redis_pool.get().map_err(|e| {
error!("Failed to get Redis connection from pool: {}", e);
(StatusCode::SERVICE_UNAVAILABLE, "Database connection error".to_string())
})?;
conn.rpush::<&str, String, ()>("dask_task_queue", task_payload)
.map_err(|e| {
error!("Failed to RPUSH to Redis queue: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to enqueue job".to_string())
})?;
info!("Job successfully enqueued");
Ok(Json(JobResponse { job_id }))
}
#[instrument(skip(state), fields(job_id = %job_id))]
async fn get_job_status(
State(state): State<AppState>,
Path(job_id): Path<String>,
) -> Result<Json<JobStatus>, (StatusCode, String)> {
info!("Checking job status");
let mut conn = state.redis_pool.get().map_err(|e| {
error!("Failed to get Redis connection from pool: {}", e);
(StatusCode::SERVICE_UNAVAILABLE, "Database connection error".to_string())
})?;
let result_key = format!("result:{}", job_id);
// Using HGETALL to retrieve all fields of the hash.
// In a real project, you might just get specific fields.
let result: Result<std::collections::HashMap<String, String>, _> = conn.hgetall(&result_key);
match result {
Ok(map) if !map.is_empty() => {
let status = map.get("status").cloned().unwrap_or_else(|| "unknown".to_string());
let result_data = map.get("data").cloned();
info!("Job status found: {}", status);
Ok(Json(JobStatus { status, result: result_data }))
},
Ok(_) => {
// Key doesn't exist yet, which means the job is pending or unknown.
info!("Job not found in results cache, assuming pending");
Ok(Json(JobStatus { status: "pending".to_string(), result: None }))
},
Err(e) => {
error!("Redis error when fetching job status for key {}: {}", result_key, e);
Err((StatusCode::INTERNAL_SERVER_ERROR, "Error fetching job status".to_string()))
}
}
}
The key takeaway here is the tracing
setup. By using fmt::layer().json()
, all log output from our service is automatically formatted as JSON, which is trivial for Fluentd to ingest. The #[instrument]
macro automatically adds key context like job_id
and trace_id
to the log entries’ structured data, which is invaluable for filtering and analysis later.
The Dask Computational Worker in Python
The Python worker is the powerhouse. Its job is to listen to the Redis queue, execute the computation, and write the result back. It must be robust to malformed data and computational errors, and crucially, it must adopt the trace_id
from the incoming task for its own logging.
dask_worker.py
:
import redis
import json
import logging
import time
import os
import sys
from dask.distributed import Client, LocalCluster
# A common pitfall is not configuring logging correctly, leading to lost context.
# We create a custom JSON formatter to ensure logs match the Rust service's format.
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"message": record.getMessage(),
"target": record.name,
}
# Add extra context if it exists, especially our trace_id and job_id.
if hasattr(record, 'extra_context'):
log_record.update(record.extra_context)
return json.dumps(log_record)
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JsonFormatter())
# Avoid adding duplicate handlers if this function is called multiple times.
if not logger.handlers:
logger.addHandler(handler)
return logger
# This is a placeholder for a real, complex computation.
# It simulates work and can fail to demonstrate error handling.
def perform_heavy_computation(params: str, dask_client: Client) -> dict:
# In a real application, you'd use the dask_client to submit
# parallelizable tasks.
# e.g., futures = dask_client.map(some_func, some_data)
# results = dask_client.gather(futures)
time.sleep(5) # Simulate a 5-second computation
if "fail" in params:
raise ValueError("Simulated computation failure")
return {"result_value": f"computed_from_{params}", "points_processed": 10000}
def main():
logger = setup_logging()
# In production, this would connect to a remote Dask cluster.
# For this example, we'll spin up a local one.
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
dask_client = Client(cluster)
logger.info("Dask worker started. Connecting to Dask scheduler at %s", dask_client.scheduler.address)
# A robust implementation would use a connection pool.
redis_host = os.environ.get("REDIS_HOST", "127.0.0.1")
redis_conn = redis.Redis(host=redis_host, port=6379, db=0)
logger.info("Connected to Redis at %s:6379", redis_host)
queue_name = "dask_task_queue"
while True:
try:
# BLPOP is a blocking pop, it will wait for an item to appear.
# A timeout of 0 means wait forever.
_, task_data_raw = redis_conn.blpop(queue_name)
task_data = json.loads(task_data_raw)
job_id = task_data.get("job_id")
trace_id = task_data.get("trace_id")
params = task_data.get("params")
# This is the critical part for unified logging.
# We create a context dictionary for all subsequent logs for this job.
log_context = {"job_id": job_id, "trace_id": trace_id}
logger.info(
"Dequeued job",
extra={'extra_context': log_context}
)
result_key = f"result:{job_id}"
try:
# Store an initial 'processing' status.
redis_conn.hset(result_key, mapping={
"status": "processing",
"worker_id": "worker-1" # In prod, use hostname or container ID
})
result = perform_heavy_computation(params, dask_client)
# On success, store the result and set status to complete.
redis_conn.hset(result_key, mapping={
"status": "complete",
"data": json.dumps(result),
"finished_at": time.time()
})
logger.info(
"Job completed successfully",
extra={'extra_context': log_context}
)
except Exception as e:
error_message = f"Computation failed: {str(e)}"
logger.error(
error_message,
exc_info=True,
extra={'extra_context': log_context}
)
# On failure, it's crucial to write an error state back.
# Otherwise, the client will poll forever.
redis_conn.hset(result_key, mapping={
"status": "failed",
"error": error_message
})
except json.JSONDecodeError:
logger.error("Failed to decode task from queue: %s", task_data_raw)
except redis.exceptions.ConnectionError as e:
logger.error("Redis connection error: %s. Retrying in 5 seconds...", e)
time.sleep(5)
except KeyboardInterrupt:
logger.info("Shutting down worker...")
break
if __name__ == "__main__":
main()
This worker script is designed for resilience. It handles JSON parsing errors, computation failures, and Redis connection issues. The JsonFormatter
and the extra
dictionary passed to the logger are the linchpin of the observability strategy, ensuring Python logs are structurally identical to the Rust logs.
The Fluentd Aggregation Configuration
Fluentd is the invisible hero that ties everything together. Its configuration defines the data pipelines: where to listen for logs, how to parse and transform them, and where to send the unified stream.
fluent.conf
:
# This configuration listens for logs from Axum (file), Dask (stdout),
# and SwiftUI (TCP), then unifies them to a single output stream.
# Source 1: Axum service logs from a file.
# In a containerized world, this would likely be stdout as well.
<source>
@type tail
path /var/log/axum_service/app.log
pos_file /var/log/fluentd/axum_service.pos
tag rust.axum.service
<parse>
@type json
</parse>
</source>
# Source 2: Dask worker logs from Docker container stdout.
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# Assuming Dask workers run in Docker with --log-driver=fluentd
# The tag will be automatically set to the container name.
# We'll use a filter to unify tags later.
# Source 3: SwiftUI client logs sent over TCP
# This requires a logging framework in Swift capable of sending JSON over TCP.
<source>
@type tcp
tag swift.swiftui.client
port 5170
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
# Filter 1: Unify the log structure and promote trace_id.
# This ensures that no matter where the log came from, it has a consistent schema.
<filter **>
@type record_transformer
<record>
# If the log message is a JSON string, re-parse it and merge.
# This handles cases where JSON is nested inside a 'message' field.
_parsed_message ${record['message'].is_a?(String) && record['message'].start_with?('{') ? JSON.parse(record['message']) : {}}
# Promote nested fields to the top level.
trace_id ${_parsed_message['fields'] ? _parsed_message['fields']['trace_id'] : record['trace_id']}
job_id ${_parsed_message['fields'] ? _parsed_message['fields']['job_id'] : record['job_id']}
hostname "#{Socket.gethostname}"
service ${tag_parts[1]} # e.g., 'axum', 'dask'
level ${record['level']}
message ${record['message']}
</record>
remove_keys _parsed_message # Clean up temporary field
</filter>
# Output: Send all unified logs to stdout for demonstration.
# In a real system, this would be an Elasticsearch, OpenSearch, or S3 output.
<match **>
@type stdout
</match>
With this configuration, a single user action in the SwiftUI app generates a trace_id
that flows through the system. We can now filter our aggregated log store (e.g., in Kibana or Grafana) for trace_id: "some-unique-id"
and see the exact lifecycle of that request:
- A log from
swift.swiftui.client
showing the job submission initiated by the user. - A log from
rust.axum.service
showing the/submit
endpoint being hit. - A log from
rust.axum.service
confirming the job was enqueued to Redis. - A log from the Dask worker container showing it dequeued the job.
- Multiple logs from the Dask worker detailing the computation progress.
- A final log from the Dask worker indicating success or failure and the write-back to Redis.
This unified view is a massive leap from grepping through separate, un-correlated log files on different machines.
The SwiftUI Client (Conceptual Implementation)
While a full SwiftUI implementation is extensive, the core logic in the ViewModel
is straightforward. It needs to generate a trace_id
, submit the job, and then poll for the result.
ComputationViewModel.swift
:
import Foundation
import Combine
import os // For logging
class ComputationViewModel: ObservableObject {
@Published var jobId: String?
@Published var jobStatus: String = "Idle"
@Published var finalResult: String?
private var cancellables = Set<AnyCancellable>()
// In a real app, this logger would be configured to forward to Fluentd over TCP.
private let logger = Logger(subsystem: "com.myapp.ios", category: "Computation")
func submitComputation(params: String) {
let traceId = UUID().uuidString
jobStatus = "Submitting..."
logger.info("Initiating computation. trace_id=\(traceId)")
guard let url = URL(string: "http://127.0.0.1:3000/submit") else { return }
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
let body = ["computation_params": params, "trace_id": traceId]
request.httpBody = try? JSONEncoder().encode(body)
URLSession.shared.dataTaskPublisher(for: request)
.map(\.data)
.decode(type: JobSubmissionResponse.self, decoder: JSONDecoder())
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
if case .failure(let error) = completion {
self.jobStatus = "Submission Failed: \(error.localizedDescription)"
self.logger.error("Job submission failed. trace_id=\(traceId), error=\(error.localizedDescription)")
}
}, receiveValue: { response in
self.jobId = response.job_id
self.jobStatus = "Submitted, awaiting result..."
self.logger.info("Job submitted successfully. trace_id=\(traceId), job_id=\(response.job_id)")
self.startPolling()
})
.store(in: &cancellables)
}
private func startPolling() {
guard let jobId = self.jobId else { return }
// A common mistake is using a simple Timer that doesn't handle backgrounding well.
// For robust polling, a more sophisticated solution is needed.
Timer.publish(every: 2.0, on: .main, in: .common)
.autoconnect()
.flatMap { _ -> URLSession.DataTaskPublisher in
guard let url = URL(string: "http://127.0.0.1:3000/status/\(jobId)") else {
// This is a programming error, should ideally not happen.
return URLSession.shared.dataTaskPublisher(for: URLRequest(url: URL(string: "about:blank")!))
}
return URLSession.shared.dataTaskPublisher(for: url)
}
.map(\.data)
.decode(type: JobStatusResponse.self, decoder: JSONDecoder())
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { _ in }, receiveValue: { statusResponse in
self.jobStatus = statusResponse.status
if statusResponse.status == "complete" || statusResponse.status == "failed" {
self.finalResult = statusResponse.result ?? "N/A"
self.cancellables.removeAll() // Stop polling
}
})
.store(in: &cancellables)
}
}
// Codable structs for API communication
struct JobSubmissionResponse: Codable {
let job_id: String
}
struct JobStatusResponse: Codable {
let status: String
let result: String?
}
This architecture, while composed of many disparate parts, achieves our goals. It is asynchronous, individually scalable (we can add more Dask workers without touching the Axum API), and—thanks to the disciplined use of a trace_id
and Fluentd—fully observable from end to end.
The current implementation, however, is not without its limitations. The client relies on HTTP polling, which is inefficient. A more advanced design would use WebSockets or Server-Sent Events for the Axum service to push status updates to the client. The Redis queue is simple but lacks the durability guarantees and dead-letter queueing features of a more robust message broker like RabbitMQ; for jobs that absolutely cannot be lost, this would be a necessary upgrade. Finally, manually propagating the trace_id
is a functional but brittle solution. The next iteration of this system should be built on a proper distributed tracing framework like OpenTelemetry, which can automate context propagation and provide much richer observability, including performance timings for each step in the chain.