The latency graph for our primary Flask monolith was starting to look like a seismograph during an earthquake. Specifically, every request touching the user configuration and feature flagging module caused a noticeable p99 latency spike. The root cause was clear after profiling: a series of complex, inefficient queries generated by our ORM against a rapidly growing PostgreSQL table. In a real-world project, a full rewrite is often a fantasy. The directive was to alleviate the pain without destabilizing the entire system. The chosen path was surgical: extract the read-heavy logic into a high-performance Rust sidecar, leaving the existing Python write path largely intact but augmenting it with a write-through caching strategy.
Our initial concept was a simple service co-located with the main Flask application. It would expose an HTTP API, maintain its own local, persistent copy of the configuration data, and serve read requests in microseconds instead of milliseconds. This approach avoids introducing network latency to a remote cache like Redis and minimizes operational overhead by not requiring another server to manage.
Technology selection was driven by pragmatism. Rust was the non-negotiable choice for the service itself, offering performance, reliability, and a fantastic ecosystem. For the web framework, Axum stood out. Its integration with the Tokio ecosystem, its composable middleware system via Tower, and its ergonomic API make it a solid choice for production services. For persistence, we needed something faster and simpler than a full SQL database. LevelDB, an embedded key-value store, was a perfect fit. It’s file-based, incredibly fast for point reads, and its operational footprint is virtually zero.
A critical, forward-looking decision was to structure the core logic as a gRPC service using Tonic, even if the initial consumer was the Flask app over HTTP. This decouples the business logic from the transport layer. Axum would simply act as an HTTP-to-gRPC gateway. When we inevitably build more Rust services, they can communicate with this component directly over high-performance gRPC without needing to go through a slower JSON/HTTP layer.
The architecture crystallized into the following flow:
graph TD subgraph Python Monolith A[Flask Application] end subgraph Rust Sidecar B[Axum HTTP Server] C[Tonic gRPC Service Logic] D[LevelDB Instance] end subgraph Main Database E[PostgreSQL] end subgraph User Traffic F[Read Request] G[Write Request] end F -- GET /config/user/123 --> A A -- HTTP GET --> B B -- Calls internal fn --> C C -- Reads from --> D D -- Returns value --> C C -- Returns value --> B B -- HTTP 200 OK --> A A -- Returns config --> F G -- POST /config/user/123 --> A A -- 1. ORM Write --> E A -- 2. HTTP POST (Write-through) --> B B -- Calls internal fn --> C C -- Writes to --> D
This design isolates the performance-critical read path while ensuring data is synchronized (with some caveats) during writes.
Project Structure and Dependencies
We start by setting up a Rust workspace to properly separate concerns. The core logic (Tonic service, database interaction) will live in a library crate (config_core
), and the Axum server will be in a binary crate (config_server
).
Cargo.toml
at the workspace root:
[workspace]
members = [
"config_core",
"config_server",
]
The core library’s dependencies:config_core/Cargo.toml
:
[package]
name = "config_core"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread", "sync"] }
tonic = "0.9.2"
prost = "0.11.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
leveldb = { package = "leveldb-sys", version = "5.0.0", features = ["leveldb-1.23"] }
thiserror = "1.0"
tracing = "0.1"
[build-dependencies]
tonic-build = "0.9.2"
And the server’s dependencies:config_server/Cargo.toml
:
[package]
name = "config_server"
version = "0.1.0"
edition = "2021"
[dependencies]
config_core = { path = "../config_core" }
tokio = { version = "1.28.2", features = ["full"] }
axum = "0.6.18"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
dotenvy = "0.15"
Defining the Service Contract with Protobuf
The foundation of our service is the contract. Defining it with Protobuf ensures clear, language-agnostic types for requests and responses.
config_core/proto/config.proto
:
syntax = "proto3";
package config;
// The core configuration service definition.
service ConfigService {
// Retrieves a configuration for a specific entity (e.g., user, tenant).
rpc GetConfig (GetConfigRequest) returns (GetConfigResponse);
// Sets or updates a configuration for a specific entity.
rpc SetConfig (SetConfigRequest) returns (SetConfigResponse);
}
message Config {
bool feature_a_enabled = 1;
string theme = 2;
int32 items_per_page = 3;
}
message GetConfigRequest {
string entity_id = 1;
}
message GetConfigResponse {
string entity_id = 1;
optional Config config = 2; // optional means it might not exist
}
message SetConfigRequest {
string entity_id = 1;
Config config = 2;
}
message SetConfigResponse {
string entity_id = 1;
bool success = 2;
}
We need a build script to compile this proto file into Rust code.
config_core/build.rs
:
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(true)
.compile(&["proto/config.proto"], &["proto/"])?;
Ok(())
}
Implementing the Core Logic and Database Layer
With the contract defined, we implement the service. A key challenge is managing access to LevelDB. LevelDB’s C++ implementation is not thread-safe by default. Direct sharing with Arc<Mutex>
can lead to contention. A common mistake is to lock the database for the entire duration of a request. A more robust pattern in a tokio
environment is to use an actor model, where a dedicated asynchronous task owns the database connection and processes requests from a channel.
This isolates all database operations to a single task, serializing access and preventing lock contention in the web handlers.
config_core/src/db.rs
:
use leveldb::database::Database;
use leveldb::kv::KV;
use leveldb::options::{Options, ReadOptions, WriteOptions};
use std::path::Path;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use crate::config::{Config, GetConfigRequest, SetConfigRequest};
#[derive(Debug, Error)]
pub enum DbError {
#[error("LevelDB operation failed: {0}")]
LevelDb(#[from] leveldb::error::Error),
#[error("Serialization failed: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Entity not found: {0}")]
NotFound(String),
#[error("Internal channel closed")]
ChannelClosed,
}
// Commands sent to the database actor.
#[derive(Debug)]
pub enum DbCommand {
Get {
key: String,
responder: oneshot::Sender<Result<Config, DbError>>,
},
Set {
key: String,
value: Config,
responder: oneshot::Sender<Result<(), DbError>>,
},
}
// The actor itself.
struct DbActor {
receiver: mpsc::Receiver<DbCommand>,
db: Database,
}
impl DbActor {
fn new(receiver: mpsc::Receiver<DbCommand>, db_path: &Path) -> Result<Self, DbError> {
let mut options = Options::new();
options.create_if_missing = true;
let database = Database::open(db_path, options)?;
Ok(DbActor {
receiver,
db: database,
})
}
async fn run(&mut self) {
while let Some(cmd) = self.receiver.recv().await {
match cmd {
DbCommand::Get { key, responder } => {
let res = self.handle_get(key);
// The responder may have been dropped if the client timed out.
// We don't care about the result of the send.
let _ = responder.send(res);
}
DbCommand::Set {
key,
value,
responder,
} => {
let res = self.handle_set(key, value);
let _ = responder.send(res);
}
}
}
}
fn handle_get(&self, key: String) -> Result<Config, DbError> {
let read_opts = ReadOptions::new();
let key_bytes = key.as_bytes();
match self.db.get(read_opts, key_bytes) {
Ok(Some(value_bytes)) => {
// In a real-world project, you'd want more robust error handling
// for malformed data. Here we assume what's in the DB is valid JSON.
let config: Config = serde_json::from_slice(&value_bytes)?;
Ok(config)
}
Ok(None) => Err(DbError::NotFound(key)),
Err(e) => Err(DbError::from(e)),
}
}
fn handle_set(&self, key: String, value: Config) -> Result<(), DbError> {
let write_opts = WriteOptions::new();
let key_bytes = key.as_bytes();
let value_bytes = serde_json::to_vec(&value)?;
self.db.put(write_opts, key_bytes, &value_bytes)?;
Ok(())
}
}
// The handle that application code uses to interact with the actor.
#[derive(Clone)]
pub struct DbHandle {
sender: mpsc::Sender<DbCommand>,
}
impl DbHandle {
pub fn new(db_path: &Path) -> Result<Self, DbError> {
let (sender, receiver) = mpsc::channel(128); // Buffer size is tunable
let mut actor = DbActor::new(receiver, db_path)?;
// Spawn the actor task. It will run in the background.
tokio::spawn(async move {
actor.run().await;
});
Ok(Self { sender })
}
pub async fn get_config(&self, req: GetConfigRequest) -> Result<Config, DbError> {
let (tx, rx) = oneshot::channel();
let cmd = DbCommand::Get {
key: req.entity_id,
responder: tx,
};
self.sender.send(cmd).await.map_err(|_| DbError::ChannelClosed)?;
rx.await.map_err(|_| DbError::ChannelClosed)?
}
pub async fn set_config(&self, req: SetConfigRequest) -> Result<(), DbError> {
let (tx, rx) = oneshot::channel();
let cmd = DbCommand::Set {
key: req.entity_id,
value: req.config,
responder: tx,
};
self.sender.send(cmd).await.map_err(|_| DbError::ChannelClosed)?;
rx.await.map_err(|_| DbError::ChannelClosed)?
}
}
Now, we implement the Tonic service trait using this DbHandle
.
config_core/src/service.rs
:
use tonic::{Request, Response, Status};
use crate::db::{DbHandle, DbError};
// Include the generated gRPC server code
pub mod config {
tonic::include_proto!("config");
}
use config::config_service_server::{ConfigService, ConfigServiceServer};
use config::{
GetConfigRequest, GetConfigResponse, SetConfigRequest, SetConfigResponse,
};
pub struct MyConfigService {
db: DbHandle,
}
impl MyConfigService {
pub fn new(db: DbHandle) -> Self {
Self { db }
}
}
#[tonic::async_trait]
impl ConfigService for MyConfigService {
async fn get_config(
&self,
request: Request<GetConfigRequest>,
) -> Result<Response<GetConfigResponse>, Status> {
let req = request.into_inner();
let entity_id = req.entity_id.clone();
match self.db.get_config(req).await {
Ok(config) => {
let response = GetConfigResponse {
entity_id,
config: Some(config),
};
Ok(Response::new(response))
}
Err(DbError::NotFound(_)) => {
// It's not an error to not find a config; it just doesn't exist.
// The proto uses `optional` for this exact reason.
let response = GetConfigResponse {
entity_id,
config: None,
};
Ok(Response::new(response))
}
Err(e) => {
// For any other error, we return an internal error status.
// A common mistake is to leak implementation details in error messages.
tracing::error!("Failed to get config: {:?}", e);
Err(Status::internal("Failed to retrieve configuration"))
}
}
}
async fn set_config(
&self,
request: Request<SetConfigRequest>,
) -> Result<Response<SetConfigResponse>, Status> {
let req = request.into_inner();
let entity_id = req.entity_id.clone();
match self.db.set_config(req).await {
Ok(()) => {
let response = SetConfigResponse {
entity_id,
success: true,
};
Ok(Response::new(response))
}
Err(e) => {
tracing::error!("Failed to set config: {:?}", e);
Err(Status::internal("Failed to persist configuration"))
}
}
}
}
// A simple public interface for our library
pub fn create_service(db_handle: DbHandle) -> ConfigServiceServer<MyConfigService> {
let service = MyConfigService::new(db_handle);
ConfigServiceServer::new(service)
}
This completes our core library. It’s self-contained, testable, and completely unaware of Axum or HTTP.
Building the Axum HTTP Gateway
Now we expose this logic over a REST-like API for our Python application. The binary crate config_server
will handle this.
First, let’s define the JSON payloads that Axum will handle. They mirror the gRPC structures.
config_server/src/http_models.rs
:
use serde::{Deserialize, Serialize};
// This mirrors the protobuf Config message for JSON serialization.
#[derive(Serialize, Deserialize, Clone)]
pub struct ConfigJson {
pub feature_a_enabled: bool,
pub theme: String,
pub items_per_page: i32,
}
// Axum handler for setting a new config will use this.
#[derive(Deserialize)]
pub struct SetConfigRequestJson {
pub entity_id: String,
pub config: ConfigJson,
}
// The response for a GET request.
#[derive(Serialize)]
pub struct GetConfigResponseJson {
pub entity_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<ConfigJson>,
}
Now, the server itself. We need to create the DbHandle
, instantiate our MyConfigService
, and then build Axum handlers that call the service methods. The state (MyConfigService
) is shared with handlers using Axum’s State
extractor.
config_server/src/main.rs
:
use axum::{
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Json},
routing::{get, post},
Router,
};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::env;
use dotenvy::dotenv;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use config_core::db::DbHandle;
use config_core::service::{config::*, MyConfigService};
mod http_models;
use http_models::{SetConfigRequestJson, GetConfigResponseJson, ConfigJson};
// Define our application state that will be shared across handlers.
// The Tonic service itself holds the database handle.
#[derive(Clone)]
struct AppState {
config_service: MyConfigService,
}
#[tokio::main]
async fn main() {
dotenv().ok(); // Load .env file
// In a real-world project, logging configuration is critical.
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "config_server=info,tower_http=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let db_path_str = env::var("DATABASE_PATH").expect("DATABASE_PATH must be set");
let db_path = PathBuf::from(db_path_str);
tracing::info!("Opening database at: {:?}", db_path);
let db_handle = DbHandle::new(&db_path)
.expect("Failed to initialize database actor");
let app_state = AppState {
config_service: MyConfigService::new(db_handle),
};
let app = Router::new()
.route("/config/:entity_id", get(get_config_handler))
.route("/config", post(set_config_handler))
.with_state(app_state);
let addr_str = env::var("LISTEN_ADDR").unwrap_or_else(|_| "127.0.0.1:3000".to_string());
let addr: SocketAddr = addr_str.parse().expect("Invalid LISTEN_ADDR format");
tracing::info!("Listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
// GET /config/:entity_id
async fn get_config_handler(
State(state): State<AppState>,
Path(entity_id): Path<String>,
) -> impl IntoResponse {
let request = tonic::Request::new(GetConfigRequest { entity_id });
match state.config_service.get_config(request).await {
Ok(response) => {
let res_inner = response.into_inner();
let json_response = GetConfigResponseJson {
entity_id: res_inner.entity_id,
config: res_inner.config.map(|c| ConfigJson {
feature_a_enabled: c.feature_a_enabled,
theme: c.theme,
items_per_page: c.items_per_page,
}),
};
(StatusCode::OK, Json(json_response))
}
Err(status) => {
tracing::error!("gRPC call failed: {}", status.message());
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Internal server error"})))
}
}
}
// POST /config
async fn set_config_handler(
State(state): State<AppState>,
Json(payload): Json<SetConfigRequestJson>,
) -> impl IntoResponse {
let request = tonic::Request::new(SetConfigRequest {
entity_id: payload.entity_id,
config: Some(Config {
feature_a_enabled: payload.config.feature_a_enabled,
theme: payload.config.theme,
items_per_page: payload.config.items_per_page,
}),
});
match state.config_service.set_config(request).await {
Ok(_) => (StatusCode::OK, Json(serde_json::json!({"success": true}))),
Err(status) => {
tracing::error!("gRPC call failed: {}", status.message());
(StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": "Failed to set config"})))
}
}
}
We can run this with a simple .env
file:
DATABASE_PATH=./config_db
LISTEN_ADDR=127.0.0.1:3000
Running cargo run
inside config_server
will start the service.
Integrating with the Legacy Flask Application
The final piece is to modify the Python code. We’ll create a simple client to communicate with our Rust sidecar. The critical change is in the view functions that previously used the ORM for reads.
flask_app/sidecar_client.py
:
import os
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Dict, Any
class ConfigSidecarClient:
"""
A client for the Rust configuration sidecar service.
Includes connection pooling, timeouts, and retries for production readiness.
"""
def __init__(self):
base_url = os.environ.get("CONFIG_SIDECAR_URL", "http://127.0.0.1:3000")
self.get_url = f"{base_url}/config/{{entity_id}}"
self.set_url = f"{base_url}/config"
self.session = requests.Session()
# A common pitfall is not configuring retries and timeouts.
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries, pool_connections=10, pool_maxsize=10)
self.session.mount("http://", adapter)
self.timeout = (0.5, 2.0) # (connect_timeout, read_timeout)
def get_config(self, entity_id: str) -> Optional[Dict[str, Any]]:
try:
url = self.get_url.format(entity_id=entity_id)
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status() # Raise exception for 4xx/5xx status codes
data = response.json()
return data.get("config")
except requests.exceptions.RequestException as e:
# In a real system, you'd have structured logging here.
print(f"Error fetching config for {entity_id}: {e}")
return None # Fail safe: if the sidecar is down, return no config
def set_config(self, entity_id: str, config: Dict[str, Any]) -> bool:
try:
payload = {"entity_id": entity_id, "config": config}
response = self.session.post(self.set_url, json=payload, timeout=self.timeout)
response.raise_for_status()
return response.json().get("success", False)
except requests.exceptions.RequestException as e:
print(f"Error setting config for {entity_id}: {e}")
return False
# Instantiate a global client
sidecar_client = ConfigSidecarClient()
Now, we modify the Flask views.
flask_app/app.py
:
from flask import Flask, jsonify, request
# from .models import db, UserConfig # The old way with SQLAlchemy ORM
from .sidecar_client import sidecar_client
app = Flask(__name__)
# ... other Flask app setup ...
@app.route("/user/<string:user_id>/config", methods=["GET"])
def get_user_config(user_id: str):
# --- The Old Way ---
# config_obj = UserConfig.query.filter_by(user_id=user_id).first()
# if not config_obj:
# return jsonify({"error": "Not Found"}), 404
# return jsonify(config_obj.to_dict())
# --- The New Way ---
config = sidecar_client.get_config(entity_id=user_id)
if config is None:
# Here we might decide to fall back to the DB as a resilience strategy,
# but for this example, we'll assume the sidecar is authoritative for reads.
return jsonify({"error": "Could not retrieve configuration"}), 503
return jsonify(config)
@app.route("/user/<string:user_id>/config", methods=["POST"])
def update_user_config(user_id: str):
data = request.get_json()
if not data:
return jsonify({"error": "Invalid payload"}), 400
# In a real-world project, this would be a single database transaction.
# The pitfall here is the lack of atomicity between the two systems.
try:
# 1. Write to the primary database via the ORM (code omitted for brevity)
# success_db = update_config_in_postgres(user_id, data)
# if not success_db:
# raise Exception("Failed to update master DB")
print(f"Imagine writing to PostgreSQL for user {user_id} here...")
# 2. Write-through to the Rust sidecar
success_sidecar = sidecar_client.set_config(entity_id=user_id, config=data)
if not success_sidecar:
# This is a problem. The DB is updated but the cache is stale.
# Production code needs a reconciliation or retry mechanism.
print(f"WARNING: Failed to update sidecar cache for {user_id}")
return jsonify({"status": "updated"}), 200
except Exception as e:
print(f"Failed to update user config: {e}")
return jsonify({"error": "Update failed"}), 500
The final result is a system where the read path, which accounts for >99% of traffic to this module, is now served by a highly-efficient Rust service reading from a local LevelDB store. The performance improvement is dramatic, and we’ve laid the groundwork for a broader migration to a more robust microservices architecture using Tonic for inter-service communication.
The primary limitation of this design is the consistency model. The write-through cache approach introduces a dual-write problem. If the primary database write succeeds but the subsequent call to the Rust sidecar fails, the cache becomes stale. This implementation accepts that risk for simplicity, but a more resilient system might use a transactional outbox pattern or a Change Data Capture (CDC) pipeline from PostgreSQL to update the sidecar’s state, guaranteeing eventual consistency without coupling the write paths. Furthermore, the current single-node LevelDB store is a single point of failure and a bottleneck if the service needs to be scaled horizontally; evolving this would mean replacing LevelDB with a distributed key-value store.