Our core problem was a familiar one in organizations with deep technical history: a high-performance, computationally intensive C++ engine, critical to the business, that operated as a command-line black box. It would take a complex configuration file, churn for anywhere from 30 minutes to 8 hours, and output a terabyte of raw binary data. The “reporting” process involved a data scientist manually running post-processing scripts and pasting charts into a PowerPoint deck. This was the primary bottleneck for an entire department. The mandate was to make this process self-service, on-demand, and present the results as rich, interactive, web-based reports.
The initial impulse to simply wrap the C++ executable in a Flask API on a massive VM was dismissed. This approach solves the immediate problem but creates a monolithic, fragile system with no scalability. A single long-running job would hog all resources, and a VM failure would kill all in-progress work. We needed a distributed, resilient, and observable system. The final architecture, born of necessity and a fair bit of debate, ended up being a heterogeneous mix of technologies, each chosen for a specific strength. The C++ engine was immutable. The output format had to be a Static Site (SSG) for performance, security, and hosting simplicity. The control plane needed to handle potentially thousands of concurrent job states and provide real-time feedback to users. This led us down a path combining C++, a Phoenix web dashboard, Azure Functions as stateless glue, and HashiCorp Consul as the system’s central nervous system.
Here is the high-level data flow we settled on:
sequenceDiagram participant User participant PhoenixUI as Phoenix UI (Elixir) participant AFuncStart as Azure Function (Start Job) participant Consul participant CppWorker as C++ Worker Node participant AFuncBuild as Azure Function (Build SSG) participant BlobStorage as Azure Blob Storage User->>PhoenixUI: Submit job request (with config) PhoenixUI->>AFuncStart: Enqueue job message (e.g., Service Bus) activate AFuncStart AFuncStart->>Consul: Find healthy CppWorker service Consul-->>AFuncStart: List of healthy workers AFuncStart->>Consul: Write job details to KV (jobs/job-123) deactivate AFuncStart CppWorker->>Consul: Long poll on KV for new jobs Consul-->>CppWorker: Job-123 details available activate CppWorker CppWorker->>Consul: Update job status: "running" CppWorker-->>CppWorker: Execute long-running C++ process... loop Real-time Progress CppWorker->>Consul: Update KV with progress % Consul-->>PhoenixUI: (via Blocking Query) Notify of change PhoenixUI-->>User: Push progress via Phoenix Channels end CppWorker->>Consul: Update job status: "complete" deactivate CppWorker Consul->>AFuncBuild: (via event/webhook) Trigger SSG build activate AFuncBuild AFuncBuild->>CppWorker: Fetch raw output data AFuncBuild-->>AFuncBuild: Run SSG (e.g., Hugo) AFuncBuild->>BlobStorage: Upload generated static site AFuncBuild->>Consul: Update job status: "published" deactivate AFuncBuild Consul-->>PhoenixUI: Notify of "published" state PhoenixUI-->>User: Provide link to the static report
The C++ Worker: Becoming a Distributed Citizen
The first challenge was to take our isolated C++ application and turn it into a well-behaved node in a distributed system. It couldn’t just be a dumb executable anymore; it needed to announce its presence, report its health, and communicate its state. We decided against embedding a full-fledged web server into the C++ code, as that would add significant complexity and dependencies. Instead, we opted to have the worker communicate directly with a local Consul agent, which is a standard pattern in Consul-based ecosystems.
The worker’s lifecycle became:
- Start up and run as a daemon.
- Register itself as a service in Consul, providing a health check.
- Enter a loop, performing a blocking query on a specific key in Consul’s Key-Value (KV) store to wait for new jobs.
- Once a job is received, update its status in the KV store to “running”.
- Execute the core C++ engine logic.
- Periodically update its progress in the KV store.
- Upon completion, update its status to “complete” and go back to waiting for another job.
Here is a simplified but functional representation of the C++ worker’s main logic. For interacting with the Consul agent’s HTTP API, we use the cpr
library, a modern C++ wrapper for libcurl.
CMakeLists.txt
for the C++ Worker:
cmake_minimum_required(VERSION 3.15)
project(CppWorker)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Find and include CPR library for HTTP requests
# Assumes CPR is installed system-wide or via a package manager like vcpkg
find_package(cpr REQUIRED)
add_executable(worker main.cpp)
target_link_libraries(worker PRIVATE cpr::cpr)
main.cpp
- The Worker Application:
#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>
#include <cpr/cpr.h>
#include <nlohmann/json.hpp>
// Use nlohmann/json for easy JSON parsing
using json = nlohmann::json;
const std::string CONSUL_AGENT_ADDR = "http://127.0.0.1:8500";
const std::string SERVICE_ID = "cpp-worker-01";
const std::string SERVICE_NAME = "cpp-computation-engine";
const int SERVICE_PORT = 9090; // Dummy port for service definition
// Registers the worker service with the local Consul agent.
void register_service() {
json service_def = {
{"ID", SERVICE_ID},
{"Name", SERVICE_NAME},
{"Port", SERVICE_PORT},
{"Check", {
{"DeregisterCriticalServiceAfter", "90s"},
{"Args", {"/bin/sh", "-c", "echo 'health check OK'"}}, // A real check would validate dependencies
{"Interval", "10s"},
{"Timeout", "5s"}
}}
};
cpr::Response r = cpr::Put(
cpr::Url{CONSUL_AGENT_ADDR + "/v1/agent/service/register"},
cpr::Body{service_def.dump()},
cpr::Header{{"Content-Type", "application/json"}}
);
if (r.status_code != 200) {
std::cerr << "Failed to register service with Consul. Status: " << r.status_code << ", Body: " << r.text << std::endl;
exit(1);
}
std::cout << "Service '" << SERVICE_ID << "' registered successfully with Consul." << std::endl;
}
// Updates a key in Consul's KV store. Used for status and progress.
void update_kv(const std::string& key, const std::string& value) {
cpr::Response r = cpr::Put(
cpr::Url{CONSUL_AGENT_ADDR + "/v1/kv/" + key},
cpr::Body{value}
);
if (r.status_code != 200) {
// In a production system, this would have retry logic.
std::cerr << "Failed to update KV for key '" << key << "'. Status: " << r.status_code << std::endl;
}
}
// Simulates the long-running, computationally intensive task.
void run_computation(const std::string& job_id) {
std::cout << "Starting computation for job: " << job_id << std::endl;
for (int i = 0; i <= 100; i += 10) {
std::cout << "Job " << job_id << " progress: " << i << "%" << std::endl;
// Update progress in Consul KV store
json progress_payload = { {"progress", i}, {"status", "running"} };
update_kv("jobs/" + job_id, progress_payload.dump());
std::this_thread::sleep_for(std::chrono::seconds(3)); // Simulate work
}
// Mark job as complete
json final_payload = {
{"progress", 100},
{"status", "complete"},
{"output_path", "/path/to/raw/data/" + job_id}
};
update_kv("jobs/" + job_id, final_payload.dump());
std::cout << "Computation for job " << job_id << " finished." << std::endl;
}
int main() {
register_service();
std::string last_consul_index = "0";
// Main worker loop: wait for jobs using a blocking query.
while (true) {
std::cout << "Waiting for a new job... (Consul index: " << last_consul_index << ")" << std::endl;
// Use a blocking query to wait for changes to the 'jobs/requests/' prefix.
// This is far more efficient than constant polling.
cpr::Response r = cpr::Get(
cpr::Url{CONSUL_AGENT_ADDR + "/v1/kv/jobs/requests/?recurse&index=" + last_consul_index},
cpr::Timeout{300000} // 5 minute timeout
);
if (r.status_code != 200) {
std::cerr << "Error polling Consul KV. Status: " << r.status_code << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5)); // Back off before retrying
continue;
}
// Extract the new Consul index from the header.
auto it = r.header.find("X-Consul-Index");
if (it != r.header.end()) {
std::string new_index = it->second;
if (new_index != last_consul_index) {
last_consul_index = new_index;
if (!r.text.empty() && r.text != "null") {
try {
auto jobs = json::parse(r.text);
for (const auto& job : jobs) {
std::string key = job["Key"]; // e.g., "jobs/requests/job-123"
std::string job_id = key.substr(key.find_last_of('/') + 1);
std::cout << "Found new job request: " << job_id << std::endl;
// A real system needs a locking mechanism to prevent multiple workers
// from picking up the same job. Consul's sessions can be used for this.
// For simplicity here, we just delete the request key.
cpr::Delete(cpr::Url{CONSUL_AGENT_ADDR + "/v1/kv/" + key});
// Process the job
run_computation(job_id);
break; // Process one job at a time
}
} catch (const json::parse_error& e) {
std::cerr << "JSON parse error: " << e.what() << std::endl;
}
}
}
}
}
return 0;
}
A critical pitfall to avoid is treating the health check as a simple process check. A worker might be running but stuck, or its disk might be full. Our check script (/bin/sh -c "echo 'health check OK'"
) is a placeholder. A production-grade check would verify disk space, network connectivity to data sources, and perhaps even run a quick self-diagnostic.
The Glue: Azure Functions for Job Dispatch and Build
Azure Functions were the perfect fit for the stateless “glue” logic in our system. Their event-driven nature and consumption-based pricing model meant we weren’t paying for idle controllers. We created two main functions.
1. Start-Job
Function:
This function is triggered by a message on an Azure Service Bus queue, placed there by the Phoenix application. Its job is to find a healthy C++ worker via Consul and create a job request in the Consul KV store.
function.json
for Start-Job (Python):
{
"scriptFile": "main.py",
"bindings": [
{
"name": "msg",
"type": "serviceBusTrigger",
"direction": "in",
"queueName": "job-requests",
"connection": "AzureServiceBusConnectionString"
}
]
}
main.py
for Start-Job:
import logging
import os
import uuid
import json
import requests
# Configuration from environment variables
CONSUL_HTTP_ADDR = os.environ.get("CONSUL_HTTP_ADDR", "http://consul.service.consul:8500")
SERVICE_NAME = "cpp-computation-engine"
def main(msg: str) -> None:
try:
logging.info(f"Received job request: {msg}")
job_params = json.loads(msg)
job_id = f"job-{uuid.uuid4()}"
# 1. Discover a healthy worker using Consul's catalog API
# The 'passing' flag is crucial to filter out unhealthy instances.
url = f"{CONSUL_HTTP_ADDR}/v1/health/service/{SERVICE_NAME}?passing"
response = requests.get(url, timeout=5)
response.raise_for_status()
healthy_services = response.json()
if not healthy_services:
logging.error("No healthy C++ workers available. Re-queuing is handled by Service Bus dead-lettering.")
# In a real scenario, you'd throw an exception to let the Azure Function host
# manage retries or dead-lettering the message.
raise Exception("No healthy workers found")
logging.info(f"Found {len(healthy_services)} healthy workers. Assigning job {job_id}.")
# 2. Create the job request in Consul's KV store.
# The C++ worker is long-polling on the 'jobs/requests/' prefix.
kv_url = f"{CONSUL_HTTP_ADDR}/v1/kv/jobs/requests/{job_id}"
kv_payload = {
"id": job_id,
"params": job_params,
"submitted_at": datetime.datetime.utcnow().isoformat()
}
kv_response = requests.put(kv_url, data=json.dumps(kv_payload), timeout=5)
kv_response.raise_for_status()
logging.info(f"Successfully created job request for {job_id} in Consul KV.")
except requests.exceptions.RequestException as e:
logging.error(f"HTTP request error: {e}")
raise
except json.JSONDecodeError as e:
logging.error(f"Invalid JSON in message: {e}")
# Don't retry invalid messages
return
except Exception as e:
logging.error(f"An unexpected error occurred: {e}")
raise
2. Build-SSG
Function:
This function is triggered when a job’s status changes to “complete”. While Consul has features like watches that can trigger webhooks, a simpler and more decoupled approach is to have the C++ worker, upon completion, also drop a message into a ssg-build-requests
queue. This Azure Function listens to that queue.
Its tasks are:
- Read the message containing the job ID and the path to the raw output data.
- Download the data from its storage location.
- Run a static site generator (we used Hugo for its speed). The function’s deployment package includes the Hugo executable.
- Upload the generated
public/
directory to an Azure Blob Storage container configured for static website hosting.
The Control Plane: Phoenix for Real-Time UI
The reason for choosing Phoenix and Elixir was singular: unparalleled support for real-time, stateful connections via Phoenix Channels, built on top of the battle-tested BEAM virtual machine. We needed to show users the real-time progress of their long-running jobs. Polling from the frontend would have been inefficient and laggy.
The main challenge was bridging the gap between state changes in Consul and the Phoenix application. A common mistake is to have Phoenix poll Consul. This is inefficient. The correct way is to leverage Consul’s blocking queries within an Elixir GenServer.
lib/my_app/consul_watcher.ex
- The GenServer for watching Consul:
defmodule MyApp.ConsulWatcher do
use GenServer
require Logger
@consul_url Application.get_env(:my_app, :consul_url)
@kv_prefix "jobs/"
@http_client Tesla # Using Tesla for HTTP requests
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{index: "0"}, name: __MODULE__)
end
# Public API
def watch do
# This could be used to dynamically add/remove watches, but for now it's static
end
# GenServer Callbacks
@impl true
def init(state) do
# Start the first watch immediately on startup
send(self(), :watch_for_changes)
{:ok, state}
end
@impl true
def handle_info(:watch_for_changes, state) do
Logger.info("Watching Consul KV prefix '#{@kv_prefix}' with index #{state.index}")
# Spawn a task to perform the blocking query so the GenServer doesn't block.
Task.async(fn -> perform_blocking_query(state.index) end)
# The result will come back as a message to handle_info
{:noreply, state}
end
@impl true
def handle_info({:consul_update, new_index, data}, _state) do
Logger.info("Consul KV change detected. New index: #{new_index}")
# Process the data and broadcast changes via Phoenix PubSub
process_and_broadcast(data)
# Immediately start the next watch with the new index
send(self(), :watch_for_changes)
# Update our state with the latest index
{:noreply, %{index: new_index}}
end
@impl true
def handle_info({:consul_error, reason}, state) do
Logger.error("Error watching Consul: #{inspect(reason)}. Retrying in 5 seconds.")
Process.sleep(5000)
send(self(), :watch_for_changes)
{:noreply, state} # Keep the same index and retry
end
defp perform_blocking_query(index) do
url = "#{@consul_url}/v1/kv/#{@kv_prefix}?recurse&index=#{index}"
# Use a long timeout for the blocking query
opts = [adapter: [hackney: [recv_timeout: 300_000]]]
case @http_client.get(url, opts) do
{:ok, %Tesla.Env{status: 200, headers: headers, body: body}} ->
new_index = List.keyfind(headers, "x-consul-index", 0, {"", "0"}) |> elem(1)
# Send the result back to the GenServer for processing in its context
GenServer.cast(__MODULE__, {:consul_update, new_index, body})
{:ok, env} ->
GenServer.cast(__MODULE__, {:consul_error, "Unexpected status: #{env.status}"})
{:error, reason} ->
GenServer.cast(__MODULE__, {:consul_error, reason})
end
end
defp process_and_broadcast(data) do
# The data is a list of KV pairs. Parse and find what changed.
parsed_data = Jason.decode!(data)
for item <- parsed_data do
# Key format: "jobs/job-123"
job_id = String.split(item["Key"], "/") |> List.last()
# The value is base64 encoded by Consul's API
decoded_value = Base.decode64!(item["Value"]) |> Jason.decode!()
# Broadcast on a topic specific to the job_id
# The Phoenix Channel for this user will be subscribed to "job_updates:<job_id>"
MyAppWeb.Endpoint.broadcast(
"job_updates:#{job_id}",
"progress_update",
%{
job_id: job_id,
status: decoded_value["status"],
progress: decoded_value["progress"]
}
)
end
end
end
This ConsulWatcher
is started as part of our application’s supervision tree. In the Phoenix Channel, when a user joins the channel for their specific job, we can subscribe them to the PubSub topic.
lib/my_app_web/channels/job_channel.ex
:
defmodule MyAppWeb.JobChannel do
use MyAppWeb, :channel
def join("job_updates:" <> job_id, _payload, socket) do
# Authenticate user and check if they have permission to view this job_id
# ...
# Subscribe to broadcasts from the ConsulWatcher
MyAppWeb.Endpoint.subscribe("job_updates:#{job_id}")
{:ok, "Joined job room for #{job_id}", assign(socket, :job_id, job_id)}
end
# This callback handles the broadcast from the ConsulWatcher
def handle_info(%{event: "progress_update", payload: payload}, socket) do
push(socket, "progress_update", payload)
{:noreply, socket}
end
end
The frontend JavaScript is then straightforward, using the standard phoenix.js
library to connect to the channel and update the UI whenever a progress_update
event is pushed from the server.
Lingering Issues and Future Iterations
This architecture, while effective, is not without its trade-offs and areas for improvement. Using Consul’s KV store as a messaging system for job requests is a deliberate simplification. A dedicated message queue like RabbitMQ or Azure Service Bus would offer more robust features like guaranteed delivery, dead-lettering, and complex routing, which we would certainly introduce in a V2. The current model relies on the C++ worker deleting the job request key, which lacks transactional guarantees; two workers could theoretically grab the same job in a race condition. This could be solved using Consul’s session locking mechanism, but again, a proper queue is the right tool for the job. Furthermore, security was deferred; all components currently trust each other on the network. The immediate next step is to enable Consul Connect to enforce service-to-service mTLS, ensuring that only authorized services can communicate. Finally, the C++ health check is naive; a more intelligent check would report the worker’s current load, allowing the Azure Function to perform more sophisticated, load-aware scheduling instead of just picking a random healthy worker.