Integrating a Legacy C++ Engine with a Modern SSG Workflow via Azure Functions and Consul


Our core problem was a familiar one in organizations with deep technical history: a high-performance, computationally intensive C++ engine, critical to the business, that operated as a command-line black box. It would take a complex configuration file, churn for anywhere from 30 minutes to 8 hours, and output a terabyte of raw binary data. The “reporting” process involved a data scientist manually running post-processing scripts and pasting charts into a PowerPoint deck. This was the primary bottleneck for an entire department. The mandate was to make this process self-service, on-demand, and present the results as rich, interactive, web-based reports.

The initial impulse to simply wrap the C++ executable in a Flask API on a massive VM was dismissed. This approach solves the immediate problem but creates a monolithic, fragile system with no scalability. A single long-running job would hog all resources, and a VM failure would kill all in-progress work. We needed a distributed, resilient, and observable system. The final architecture, born of necessity and a fair bit of debate, ended up being a heterogeneous mix of technologies, each chosen for a specific strength. The C++ engine was immutable. The output format had to be a Static Site (SSG) for performance, security, and hosting simplicity. The control plane needed to handle potentially thousands of concurrent job states and provide real-time feedback to users. This led us down a path combining C++, a Phoenix web dashboard, Azure Functions as stateless glue, and HashiCorp Consul as the system’s central nervous system.

Here is the high-level data flow we settled on:

sequenceDiagram
    participant User
    participant PhoenixUI as Phoenix UI (Elixir)
    participant AFuncStart as Azure Function (Start Job)
    participant Consul
    participant CppWorker as C++ Worker Node
    participant AFuncBuild as Azure Function (Build SSG)
    participant BlobStorage as Azure Blob Storage

    User->>PhoenixUI: Submit job request (with config)
    PhoenixUI->>AFuncStart: Enqueue job message (e.g., Service Bus)
    activate AFuncStart
    AFuncStart->>Consul: Find healthy CppWorker service
    Consul-->>AFuncStart: List of healthy workers
    AFuncStart->>Consul: Write job details to KV (jobs/job-123)
    deactivate AFuncStart
    
    CppWorker->>Consul: Long poll on KV for new jobs
    Consul-->>CppWorker: Job-123 details available
    activate CppWorker
    CppWorker->>Consul: Update job status: "running"
    CppWorker-->>CppWorker: Execute long-running C++ process...
    
    loop Real-time Progress
        CppWorker->>Consul: Update KV with progress %
        Consul-->>PhoenixUI: (via Blocking Query) Notify of change
        PhoenixUI-->>User: Push progress via Phoenix Channels
    end
    
    CppWorker->>Consul: Update job status: "complete"
    deactivate CppWorker
    
    Consul->>AFuncBuild: (via event/webhook) Trigger SSG build
    activate AFuncBuild
    AFuncBuild->>CppWorker: Fetch raw output data
    AFuncBuild-->>AFuncBuild: Run SSG (e.g., Hugo)
    AFuncBuild->>BlobStorage: Upload generated static site
    AFuncBuild->>Consul: Update job status: "published"
    deactivate AFuncBuild

    Consul-->>PhoenixUI: Notify of "published" state
    PhoenixUI-->>User: Provide link to the static report

The C++ Worker: Becoming a Distributed Citizen

The first challenge was to take our isolated C++ application and turn it into a well-behaved node in a distributed system. It couldn’t just be a dumb executable anymore; it needed to announce its presence, report its health, and communicate its state. We decided against embedding a full-fledged web server into the C++ code, as that would add significant complexity and dependencies. Instead, we opted to have the worker communicate directly with a local Consul agent, which is a standard pattern in Consul-based ecosystems.

The worker’s lifecycle became:

  1. Start up and run as a daemon.
  2. Register itself as a service in Consul, providing a health check.
  3. Enter a loop, performing a blocking query on a specific key in Consul’s Key-Value (KV) store to wait for new jobs.
  4. Once a job is received, update its status in the KV store to “running”.
  5. Execute the core C++ engine logic.
  6. Periodically update its progress in the KV store.
  7. Upon completion, update its status to “complete” and go back to waiting for another job.

Here is a simplified but functional representation of the C++ worker’s main logic. For interacting with the Consul agent’s HTTP API, we use the cpr library, a modern C++ wrapper for libcurl.

CMakeLists.txt for the C++ Worker:

cmake_minimum_required(VERSION 3.15)
project(CppWorker)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Find and include CPR library for HTTP requests
# Assumes CPR is installed system-wide or via a package manager like vcpkg
find_package(cpr REQUIRED)

add_executable(worker main.cpp)

target_link_libraries(worker PRIVATE cpr::cpr)

main.cpp - The Worker Application:

#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>
#include <cpr/cpr.h>
#include <nlohmann/json.hpp>

// Use nlohmann/json for easy JSON parsing
using json = nlohmann::json;

const std::string CONSUL_AGENT_ADDR = "http://127.0.0.1:8500";
const std::string SERVICE_ID = "cpp-worker-01";
const std::string SERVICE_NAME = "cpp-computation-engine";
const int SERVICE_PORT = 9090; // Dummy port for service definition

// Registers the worker service with the local Consul agent.
void register_service() {
    json service_def = {
        {"ID", SERVICE_ID},
        {"Name", SERVICE_NAME},
        {"Port", SERVICE_PORT},
        {"Check", {
            {"DeregisterCriticalServiceAfter", "90s"},
            {"Args", {"/bin/sh", "-c", "echo 'health check OK'"}}, // A real check would validate dependencies
            {"Interval", "10s"},
            {"Timeout", "5s"}
        }}
    };

    cpr::Response r = cpr::Put(
        cpr::Url{CONSUL_AGENT_ADDR + "/v1/agent/service/register"},
        cpr::Body{service_def.dump()},
        cpr::Header{{"Content-Type", "application/json"}}
    );

    if (r.status_code != 200) {
        std::cerr << "Failed to register service with Consul. Status: " << r.status_code << ", Body: " << r.text << std::endl;
        exit(1);
    }
    std::cout << "Service '" << SERVICE_ID << "' registered successfully with Consul." << std::endl;
}

// Updates a key in Consul's KV store. Used for status and progress.
void update_kv(const std::string& key, const std::string& value) {
    cpr::Response r = cpr::Put(
        cpr::Url{CONSUL_AGENT_ADDR + "/v1/kv/" + key},
        cpr::Body{value}
    );
    if (r.status_code != 200) {
        // In a production system, this would have retry logic.
        std::cerr << "Failed to update KV for key '" << key << "'. Status: " << r.status_code << std::endl;
    }
}

// Simulates the long-running, computationally intensive task.
void run_computation(const std::string& job_id) {
    std::cout << "Starting computation for job: " << job_id << std::endl;
    for (int i = 0; i <= 100; i += 10) {
        std::cout << "Job " << job_id << " progress: " << i << "%" << std::endl;
        
        // Update progress in Consul KV store
        json progress_payload = { {"progress", i}, {"status", "running"} };
        update_kv("jobs/" + job_id, progress_payload.dump());
        
        std::this_thread::sleep_for(std::chrono::seconds(3)); // Simulate work
    }

    // Mark job as complete
    json final_payload = {
        {"progress", 100},
        {"status", "complete"},
        {"output_path", "/path/to/raw/data/" + job_id}
    };
    update_kv("jobs/" + job_id, final_payload.dump());
    std::cout << "Computation for job " << job_id << " finished." << std::endl;
}


int main() {
    register_service();
    
    std::string last_consul_index = "0";

    // Main worker loop: wait for jobs using a blocking query.
    while (true) {
        std::cout << "Waiting for a new job... (Consul index: " << last_consul_index << ")" << std::endl;

        // Use a blocking query to wait for changes to the 'jobs/requests/' prefix.
        // This is far more efficient than constant polling.
        cpr::Response r = cpr::Get(
            cpr::Url{CONSUL_AGENT_ADDR + "/v1/kv/jobs/requests/?recurse&index=" + last_consul_index},
            cpr::Timeout{300000} // 5 minute timeout
        );

        if (r.status_code != 200) {
            std::cerr << "Error polling Consul KV. Status: " << r.status_code << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(5)); // Back off before retrying
            continue;
        }

        // Extract the new Consul index from the header.
        auto it = r.header.find("X-Consul-Index");
        if (it != r.header.end()) {
            std::string new_index = it->second;
            if (new_index != last_consul_index) {
                last_consul_index = new_index;

                if (!r.text.empty() && r.text != "null") {
                    try {
                        auto jobs = json::parse(r.text);
                        for (const auto& job : jobs) {
                            std::string key = job["Key"]; // e.g., "jobs/requests/job-123"
                            std::string job_id = key.substr(key.find_last_of('/') + 1);

                            std::cout << "Found new job request: " << job_id << std::endl;

                            // A real system needs a locking mechanism to prevent multiple workers
                            // from picking up the same job. Consul's sessions can be used for this.
                            // For simplicity here, we just delete the request key.
                            cpr::Delete(cpr::Url{CONSUL_AGENT_ADDR + "/v1/kv/" + key});
                            
                            // Process the job
                            run_computation(job_id);
                            break; // Process one job at a time
                        }
                    } catch (const json::parse_error& e) {
                        std::cerr << "JSON parse error: " << e.what() << std::endl;
                    }
                }
            }
        }
    }

    return 0;
}

A critical pitfall to avoid is treating the health check as a simple process check. A worker might be running but stuck, or its disk might be full. Our check script (/bin/sh -c "echo 'health check OK'") is a placeholder. A production-grade check would verify disk space, network connectivity to data sources, and perhaps even run a quick self-diagnostic.

The Glue: Azure Functions for Job Dispatch and Build

Azure Functions were the perfect fit for the stateless “glue” logic in our system. Their event-driven nature and consumption-based pricing model meant we weren’t paying for idle controllers. We created two main functions.

1. Start-Job Function:
This function is triggered by a message on an Azure Service Bus queue, placed there by the Phoenix application. Its job is to find a healthy C++ worker via Consul and create a job request in the Consul KV store.

function.json for Start-Job (Python):

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "name": "msg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "job-requests",
      "connection": "AzureServiceBusConnectionString"
    }
  ]
}

main.py for Start-Job:

import logging
import os
import uuid
import json
import requests

# Configuration from environment variables
CONSUL_HTTP_ADDR = os.environ.get("CONSUL_HTTP_ADDR", "http://consul.service.consul:8500")
SERVICE_NAME = "cpp-computation-engine"

def main(msg: str) -> None:
    try:
        logging.info(f"Received job request: {msg}")
        job_params = json.loads(msg)
        job_id = f"job-{uuid.uuid4()}"

        # 1. Discover a healthy worker using Consul's catalog API
        # The 'passing' flag is crucial to filter out unhealthy instances.
        url = f"{CONSUL_HTTP_ADDR}/v1/health/service/{SERVICE_NAME}?passing"
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        
        healthy_services = response.json()
        if not healthy_services:
            logging.error("No healthy C++ workers available. Re-queuing is handled by Service Bus dead-lettering.")
            # In a real scenario, you'd throw an exception to let the Azure Function host
            # manage retries or dead-lettering the message.
            raise Exception("No healthy workers found")

        logging.info(f"Found {len(healthy_services)} healthy workers. Assigning job {job_id}.")

        # 2. Create the job request in Consul's KV store.
        # The C++ worker is long-polling on the 'jobs/requests/' prefix.
        kv_url = f"{CONSUL_HTTP_ADDR}/v1/kv/jobs/requests/{job_id}"
        kv_payload = {
            "id": job_id,
            "params": job_params,
            "submitted_at": datetime.datetime.utcnow().isoformat()
        }
        
        kv_response = requests.put(kv_url, data=json.dumps(kv_payload), timeout=5)
        kv_response.raise_for_status()

        logging.info(f"Successfully created job request for {job_id} in Consul KV.")

    except requests.exceptions.RequestException as e:
        logging.error(f"HTTP request error: {e}")
        raise
    except json.JSONDecodeError as e:
        logging.error(f"Invalid JSON in message: {e}")
        # Don't retry invalid messages
        return
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        raise

2. Build-SSG Function:
This function is triggered when a job’s status changes to “complete”. While Consul has features like watches that can trigger webhooks, a simpler and more decoupled approach is to have the C++ worker, upon completion, also drop a message into a ssg-build-requests queue. This Azure Function listens to that queue.

Its tasks are:

  1. Read the message containing the job ID and the path to the raw output data.
  2. Download the data from its storage location.
  3. Run a static site generator (we used Hugo for its speed). The function’s deployment package includes the Hugo executable.
  4. Upload the generated public/ directory to an Azure Blob Storage container configured for static website hosting.

The Control Plane: Phoenix for Real-Time UI

The reason for choosing Phoenix and Elixir was singular: unparalleled support for real-time, stateful connections via Phoenix Channels, built on top of the battle-tested BEAM virtual machine. We needed to show users the real-time progress of their long-running jobs. Polling from the frontend would have been inefficient and laggy.

The main challenge was bridging the gap between state changes in Consul and the Phoenix application. A common mistake is to have Phoenix poll Consul. This is inefficient. The correct way is to leverage Consul’s blocking queries within an Elixir GenServer.

lib/my_app/consul_watcher.ex - The GenServer for watching Consul:

defmodule MyApp.ConsulWatcher do
  use GenServer
  require Logger

  @consul_url Application.get_env(:my_app, :consul_url)
  @kv_prefix "jobs/"
  @http_client Tesla # Using Tesla for HTTP requests

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{index: "0"}, name: __MODULE__)
  end

  # Public API
  def watch do
    # This could be used to dynamically add/remove watches, but for now it's static
  end

  # GenServer Callbacks
  @impl true
  def init(state) do
    # Start the first watch immediately on startup
    send(self(), :watch_for_changes)
    {:ok, state}
  end

  @impl true
  def handle_info(:watch_for_changes, state) do
    Logger.info("Watching Consul KV prefix '#{@kv_prefix}' with index #{state.index}")
    
    # Spawn a task to perform the blocking query so the GenServer doesn't block.
    Task.async(fn -> perform_blocking_query(state.index) end)

    # The result will come back as a message to handle_info
    {:noreply, state}
  end

  @impl true
  def handle_info({:consul_update, new_index, data}, _state) do
    Logger.info("Consul KV change detected. New index: #{new_index}")
    
    # Process the data and broadcast changes via Phoenix PubSub
    process_and_broadcast(data)

    # Immediately start the next watch with the new index
    send(self(), :watch_for_changes)
    
    # Update our state with the latest index
    {:noreply, %{index: new_index}}
  end

  @impl true
  def handle_info({:consul_error, reason}, state) do
    Logger.error("Error watching Consul: #{inspect(reason)}. Retrying in 5 seconds.")
    Process.sleep(5000)
    send(self(), :watch_for_changes)
    {:noreply, state} # Keep the same index and retry
  end

  defp perform_blocking_query(index) do
    url = "#{@consul_url}/v1/kv/#{@kv_prefix}?recurse&index=#{index}"
    
    # Use a long timeout for the blocking query
    opts = [adapter: [hackney: [recv_timeout: 300_000]]] 

    case @http_client.get(url, opts) do
      {:ok, %Tesla.Env{status: 200, headers: headers, body: body}} ->
        new_index = List.keyfind(headers, "x-consul-index", 0, {"", "0"}) |> elem(1)
        # Send the result back to the GenServer for processing in its context
        GenServer.cast(__MODULE__, {:consul_update, new_index, body})
      
      {:ok, env} ->
        GenServer.cast(__MODULE__, {:consul_error, "Unexpected status: #{env.status}"})

      {:error, reason} ->
        GenServer.cast(__MODULE__, {:consul_error, reason})
    end
  end

  defp process_and_broadcast(data) do
    # The data is a list of KV pairs. Parse and find what changed.
    parsed_data = Jason.decode!(data)
    
    for item <- parsed_data do
      # Key format: "jobs/job-123"
      job_id = String.split(item["Key"], "/") |> List.last()
      
      # The value is base64 encoded by Consul's API
      decoded_value = Base.decode64!(item["Value"]) |> Jason.decode!()

      # Broadcast on a topic specific to the job_id
      # The Phoenix Channel for this user will be subscribed to "job_updates:<job_id>"
      MyAppWeb.Endpoint.broadcast(
        "job_updates:#{job_id}",
        "progress_update",
        %{
          job_id: job_id,
          status: decoded_value["status"],
          progress: decoded_value["progress"]
        }
      )
    end
  end
end

This ConsulWatcher is started as part of our application’s supervision tree. In the Phoenix Channel, when a user joins the channel for their specific job, we can subscribe them to the PubSub topic.

lib/my_app_web/channels/job_channel.ex:

defmodule MyAppWeb.JobChannel do
  use MyAppWeb, :channel

  def join("job_updates:" <> job_id, _payload, socket) do
    # Authenticate user and check if they have permission to view this job_id
    # ...

    # Subscribe to broadcasts from the ConsulWatcher
    MyAppWeb.Endpoint.subscribe("job_updates:#{job_id}")
    
    {:ok, "Joined job room for #{job_id}", assign(socket, :job_id, job_id)}
  end
  
  # This callback handles the broadcast from the ConsulWatcher
  def handle_info(%{event: "progress_update", payload: payload}, socket) do
    push(socket, "progress_update", payload)
    {:noreply, socket}
  end
end

The frontend JavaScript is then straightforward, using the standard phoenix.js library to connect to the channel and update the UI whenever a progress_update event is pushed from the server.

Lingering Issues and Future Iterations

This architecture, while effective, is not without its trade-offs and areas for improvement. Using Consul’s KV store as a messaging system for job requests is a deliberate simplification. A dedicated message queue like RabbitMQ or Azure Service Bus would offer more robust features like guaranteed delivery, dead-lettering, and complex routing, which we would certainly introduce in a V2. The current model relies on the C++ worker deleting the job request key, which lacks transactional guarantees; two workers could theoretically grab the same job in a race condition. This could be solved using Consul’s session locking mechanism, but again, a proper queue is the right tool for the job. Furthermore, security was deferred; all components currently trust each other on the network. The immediate next step is to enable Consul Connect to enforce service-to-service mTLS, ensuring that only authorized services can communicate. Finally, the C++ health check is naive; a more intelligent check would report the worker’s current load, allowing the Azure Function to perform more sophisticated, load-aware scheduling instead of just picking a random healthy worker.


  TOC