Building an Observable Saga Execution Engine with Go-Fiber and ClickHouse for Polyglot Microservices


The transition from a monolith to microservices solved our scaling problem but created a new, more insidious one: managing data consistency across service boundaries. The order processing workflow, once a single database transaction, was now scattered across three services. Initial attempts with two-phase commit protocols introduced unacceptable latency and tight coupling, defeating the purpose of the migration. We were left staring at the eventual consistency model, with the Saga pattern as the most pragmatic path forward.

Our initial concept was an orchestration-based Saga. A central service would coordinate a sequence of local transactions across participants. If any step failed, the orchestrator would execute a series of compensating transactions to semantically undo the operation. The real-world problem, however, isn’t just implementing the happy path and the compensation path. The true challenge is diagnosing a partially failed Saga in a high-throughput production environment. When a transaction involving services written in Go and Python fails midway, how do you reconstruct its history without spending hours cross-referencing logs from different systems? This led to our core design principle: observability is not an afterthought; it is the foundation of a resilient Saga implementation.

Our technology selection was driven by this principle.

  • Go-Fiber for the Orchestrator: The orchestrator is a critical, high-throughput component. We needed performance and low-latency concurrency. Go was the obvious choice. We picked Fiber for its minimalist, Express.js-like API, which allowed for rapid development without sacrificing performance. It’s a pragmatic tool for building lean, fast HTTP services.
  • FastAPI for a Participant Service: Our fraud detection service was already built and maintained by the data science team using Python. FastAPI was their framework of choice for its performance and automatic documentation. This polyglot reality meant our Saga orchestrator had to be language-agnostic, communicating over standard protocols like HTTP.
  • ClickHouse for the Observability Data Store: A busy Saga orchestrator generates a torrential volume of state change events. We needed to log every step initiation, success, failure, and compensation attempt. Storing this in PostgreSQL or Elasticsearch felt like a future operational nightmare. We anticipated needing to run fast, analytical queries on this data, like “show me the latency distribution of step C” or “find all Sagas that failed at the payment step in the last hour.” ClickHouse, with its columnar storage and incredible aggregation speed, was purpose-built for this exact use case.
  • UnoCSS for a Diagnostic UI: To make observability tangible, we needed a simple internal tool. A developer or support engineer should be able to paste in a transaction ID and see the entire lifecycle of the corresponding Saga. Building a complex frontend was out of the question. We decided to have the Go-Fiber orchestrator serve a single static HTML file styled with UnoCSS. Its on-demand, utility-first approach meant we could build a clean, functional interface in a few hours directly in the HTML without a build step.

The architecture is straightforward. The Go-Fiber service acts as the Saga orchestrator. It receives an API call to start a process, and it manages the state machine, calling participant services (like the FastAPI fraud service) in sequence. Critically, every state transition is logged as a structured event to a central ClickHouse database.

graph TD
    subgraph "Observability Plane"
        CH[(ClickHouse)]
    end

    subgraph "Saga Execution Plane"
        Client -->|1. POST /api/orders| Orchestrator[Go-Fiber Saga Orchestrator]
        Orchestrator -->|2. Invoke Step A| InventoryService[Participant A: Go]
        Orchestrator -->|3. Invoke Step B| FraudService[Participant B: FastAPI]
        Orchestrator -->|4. Invoke Step C| PaymentService[Participant C: Go]
    end

    subgraph "Diagnostic UI"
        Developer -->|Query Saga ID| DiagnosticWebApp[UnoCSS UI served by Orchestrator]
        DiagnosticWebApp -->|API call| Orchestrator
        Orchestrator -->|SQL Query| CH
    end

    Orchestrator -- "Log Every State Change" --> CH
    InventoryService -- "Log Status" --> CH
    FraudService -- "Log Status" --> CH
    PaymentService -- "Log Status" --> CH

1. Defining the Saga Structure and Executor in Go

First, we defined the core data structures for our Saga in the Go orchestrator. The Saga holds its definition and runtime state. Each Step defines a local transaction and its corresponding compensation.

// internal/saga/saga.go

package saga

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"time"

	"github.com/google/uuid"
)

// Step defines a single step in the Saga, including its action and compensation.
type Step struct {
	Name          string
	Action        func(payload []byte) ([]byte, error)
	Compensation  func(payload []byte) error
	DependsOn     string // Name of the step whose result is this step's input
}

// Saga defines the entire distributed transaction.
type Saga struct {
	ID    string
	Name  string
	Steps map[string]Step
	Order []string // Execution order of step names
}

// ExecutionState tracks the runtime progress of a Saga instance.
type ExecutionState struct {
	SagaID          string
	TransactionID   string
	CurrentStep     int
	Payloads        map[string][]byte // Stores output of each successful step
	IsCompensating  bool
	CompletedSteps  []string
}

// Executor runs the Saga.
type Executor struct {
	logger Logger // Our structured logger interface
}

// NewExecutor creates a new Saga executor.
func NewExecutor(logger Logger) *Executor {
	return &Executor{logger: logger}
}

// Logger interface allows us to inject any structured logger that sends data to ClickHouse.
type Logger interface {
	Log(ctx context.Context, state ExecutionState, status, message string)
}


// Execute starts a new Saga transaction.
func (e *Executor) Execute(ctx context.Context, s *Saga, initialPayload []byte) error {
	state := ExecutionState{
		SagaID:          s.ID,
		TransactionID:   uuid.NewString(),
		CurrentStep:     0,
		Payloads:        make(map[string][]byte),
		IsCompensating:  false,
		CompletedSteps:  []string{},
	}
	state.Payloads["initial"] = initialPayload

	e.logger.Log(ctx, state, "STARTED", fmt.Sprintf("Saga '%s' started.", s.Name))

	for i, stepName := range s.Order {
		state.CurrentStep = i
		step := s.Steps[stepName]
		
		var inputPayload []byte
		if step.DependsOn != "" {
			inputPayload = state.Payloads[step.DependsOn]
		} else {
			inputPayload = state.Payloads["initial"]
		}

		e.logger.Log(ctx, state, "STEP_STARTED", fmt.Sprintf("Executing step '%s'.", step.Name))
		
		outputPayload, err := step.Action(inputPayload)
		if err != nil {
			e.logger.Log(ctx, state, "STEP_FAILED", fmt.Sprintf("Step '%s' failed: %v", step.Name, err))
			e.compensate(ctx, s, state)
			return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
		}

		state.Payloads[step.Name] = outputPayload
		state.CompletedSteps = append(state.CompletedSteps, step.Name)
		e.logger.Log(ctx, state, "STEP_COMPLETED", fmt.Sprintf("Step '%s' completed.", step.Name))
	}

	e.logger.Log(ctx, state, "COMPLETED", fmt.Sprintf("Saga '%s' completed successfully.", s.Name))
	return nil
}

func (e *Executor) compensate(ctx context.Context, s *Saga, state ExecutionState) {
	state.IsCompensating = true
	e.logger.Log(ctx, state, "COMPENSATING", "Starting compensation process.")

	// Compensate in reverse order of completion.
	for i := len(state.CompletedSteps) - 1; i >= 0; i-- {
		stepName := state.CompletedSteps[i]
		step := s.Steps[stepName]
		
		if step.Compensation != nil {
			e.logger.Log(ctx, state, "COMPENSATION_STARTED", fmt.Sprintf("Compensating for step '%s'.", step.Name))
			
			// A real-world project would need more robust logic for compensation payloads.
			// Here we use the original input to the failed step for simplicity.
			err := step.Compensation(state.Payloads[step.DependsOn])
			if err != nil {
				// This is a critical failure. Manual intervention is required.
				// The system should emit a high-priority alert.
				e.logger.Log(ctx, state, "COMPENSATION_FAILED", fmt.Sprintf("CRITICAL: Compensation for step '%s' failed: %v", step.Name, err))
				return // Stop compensation chain.
			}
			e.logger.Log(ctx, state, "COMPENSATION_COMPLETED", fmt.Sprintf("Compensation for step '%s' completed.", step.Name))
		}
	}
	e.logger.Log(ctx, state, "COMPENSATED", "Saga compensation finished.")
}

This executor is simple but captures the core logic: execute steps in order, and if one fails, execute compensations in reverse order. The crucial part is the logger.Log call at every state change.

2. The ClickHouse Schema and Logger Implementation

The power of this approach hinges on the data we capture. We designed a simple but effective table in ClickHouse to store the lifecycle of every transaction.

-- DDL for the ClickHouse table
CREATE TABLE IF NOT EXISTS default.saga_logs (
    `timestamp` DateTime64(3) CODEC(Delta, ZSTD),
    `saga_id` LowCardinality(String),
    `transaction_id` UUID,
    `step_name` LowCardinality(String),
    `service_name` LowCardinality(String),
    `status` LowCardinality(String),
    `message` String,
    `payload` String,
    `duration_ms` UInt32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (saga_id, transaction_id, timestamp);

Key decisions in this schema:

  • LowCardinality(String) is used for fields with a limited set of repeating values (saga_id, step_name, status). This dramatically reduces storage size and improves query performance.
  • PARTITION BY toYYYYMM(timestamp) is essential for managing data retention. Old partitions can be dropped efficiently.
  • ORDER BY (saga_id, transaction_id, timestamp) is the primary key. It co-locates all events for a single transaction on disk, making queries that filter by transaction_id extremely fast.

Next, the Go implementation of the Logger interface which writes to this table. We use the official clickhouse-go v2 client. A common mistake is to insert rows one by one; this kills ClickHouse performance. We must use batching.

// internal/logging/ch_logger.go

package logging

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/ClickHouse/clickhouse-go/v2"
	"github.com/yourapp/internal/saga" // Import your saga package
)

type ClickHouseLogger struct {
	conn clickhouse.Conn
}

func NewClickHouseLogger(addr string) (*ClickHouseLogger, error) {
	conn, err := clickhouse.Open(&clickhouse.Options{
		Addr: []string{addr},
		Auth: clickhouse.Auth{
			Database: "default",
			// Add username and password if needed
		},
		ClientInfo: clickhouse.ClientInfo{
			Products: []struct {
				Name    string
				Version string
			}{
				{Name: "saga-orchestrator", Version: "1.0"},
			},
		},
	})
	if err != nil {
		return nil, err
	}
	return &ClickHouseLogger{conn: conn}, nil
}

func (l *ClickHouseLogger) Log(ctx context.Context, state saga.ExecutionState, status, message string) {
    // In a production system, this should be an asynchronous, buffered write.
    // For this example, we do a synchronous write for clarity.
	go func() {
		batch, err := l.conn.PrepareBatch(context.Background(), "INSERT INTO saga_logs")
		if err != nil {
			log.Printf("Failed to prepare batch: %v", err)
			return
		}

		currentStepName := "orchestrator"
		if state.CurrentStep < len(state.Saga.Order) { // Assuming Saga definition is in state
			currentStepName = state.Saga.Order[state.CurrentStep]
		}
		
		err = batch.Append(
			time.Now(),
			state.SagaID,
			state.TransactionID,
			currentStepName,
			"saga-orchestrator", // service_name
			status,
			message,
			"", // Payload can be added if needed, be careful with PII
			uint32(0), // Duration can be calculated and added
		)
		if err != nil {
			log.Printf("Failed to append to batch: %v", err)
			return
		}
		
		if err := batch.Send(); err != nil {
			log.Printf("Failed to send batch to ClickHouse: %v", err)
		}
	}()
}

// Ensure the logger implements the interface
var _ saga.Logger = (*ClickHouseLogger)(nil)

3. Implementing the Polyglot Participant with FastAPI

The fraud detection service is a simple FastAPI application. It exposes two endpoints: one for the action (/check_fraud) and one for the compensation (/cancel_fraud_check). It receives the X-Transaction-ID and X-Saga-ID headers to correlate its logs with the orchestrator.

# fraud_service/main.py

from fastapi import FastAPI, Header, HTTPException, Request
import logging
import random
import time
from typing import Optional
from clickhouse_driver import Client

app = FastAPI()

# In a real app, connection details would come from config
ch_client = Client(host='localhost')

logging.basicConfig(level=logging.INFO)

def log_to_clickhouse(saga_id, transaction_id, step_name, status, message):
    try:
        ch_client.execute(
            'INSERT INTO saga_logs (timestamp, saga_id, transaction_id, step_name, service_name, status, message) VALUES',
            [{
                'timestamp': int(time.time()),
                'saga_id': saga_id,
                'transaction_id': transaction_id,
                'step_name': step_name,
                'service_name': 'fraud-service-py',
                'status': status,
                'message': message,
            }]
        )
    except Exception as e:
        logging.error(f"Failed to log to ClickHouse: {e}")


@app.post("/check_fraud")
async def check_fraud(
    request: Request,
    x_transaction_id: Optional[str] = Header(None),
    x_saga_id: Optional[str] = Header(None),
):
    step_name = "FraudCheck"
    log_to_clickhouse(x_saga_id, x_transaction_id, step_name, "STEP_STARTED", "Fraud check initiated.")
    
    # Simulate work and a potential failure
    time.sleep(0.1)
    if random.random() < 0.1: # 10% chance of failure
        log_to_clickhouse(x_saga_id, x_transaction_id, step_name, "STEP_FAILED", "Fraud score exceeded threshold.")
        raise HTTPException(status_code=400, detail="High fraud risk detected")

    log_to_clickhouse(x_saga_id, x_transaction_id, step_name, "STEP_COMPLETED", "Fraud check passed.")
    return {"status": "ok", "fraud_score": 0.05}


@app.post("/cancel_fraud_check")
async def cancel_fraud_check(
    request: Request,
    x_transaction_id: Optional[str] = Header(None),
    x_saga_id: Optional[str] = Header(None),
):
    step_name = "FraudCheck" # Should match the original step
    log_to_clickhouse(x_saga_id, x_transaction_id, step_name, "COMPENSATION_STARTED", "Cancelling fraud check.")
    
    # Logic to mark the check as void in a local DB would go here.
    time.sleep(0.05)

    log_to_clickhouse(x_saga_id, x_transaction_id, step_name, "COMPENSATION_COMPLETED", "Fraud check cancelled.")
    return {"status": "cancellation_complete"}

This demonstrates the pattern: every participant service is responsible for logging its own start, completion, or failure to the central ClickHouse instance, using the transaction ID for correlation.

4. Tying It Together with the Go-Fiber Orchestrator

Now, we configure the Saga and expose an endpoint in our main.go file.

// cmd/orchestrator/main.go

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/google/uuid"
	
	"github.com/yourapp/internal/logging"
	"github.com/yourapp/internal/saga"
)

// makeRequest is a helper to create Saga step functions that call HTTP endpoints.
func makeRequest(method, url string) func(payload []byte) ([]byte, error) {
	return func(payload []byte) ([]byte, error) {
		req, err := http.NewRequest(method, url, bytes.NewBuffer(payload))
		if err != nil {
			return nil, err
		}
		// In a real system, transaction/saga IDs should be passed in headers.
		req.Header.Set("Content-Type", "application/json")
		
		client := &http.Client{Timeout: 5 * time.Second}
		resp, err := client.Do(req)
		if err != nil {
			return nil, err
		}
		defer resp.Body.Close()

		if resp.StatusCode >= 400 {
			body, _ := io.ReadAll(resp.Body)
			return nil, fmt.Errorf("service returned status %d: %s", resp.StatusCode, string(body))
		}
		return io.ReadAll(resp.Body)
	}
}


func main() {
	app := fiber.New()

	chLogger, err := logging.NewClickHouseLogger("localhost:9000")
	if err != nil {
		log.Fatalf("Could not connect to ClickHouse: %v", err)
	}
	sagaExecutor := saga.NewExecutor(chLogger)

	// Define our order processing Saga
	orderSaga := &saga.Saga{
		ID:    "order-processing-v1",
		Name:  "Process New Order",
		Steps: map[string]saga.Step{
			"ReserveInventory": {
				Name:         "ReserveInventory",
				Action:       makeRequest("POST", "http://localhost:3001/reserve"),
				Compensation: makeRequest("POST", "http://localhost:3001/release"),
			},
			"FraudCheck": {
				Name:         "FraudCheck",
				Action:       makeRequest("POST", "http://localhost:8000/check_fraud"),
				Compensation: makeRequest("POST", "http://localhost:8000/cancel_fraud_check"),
				DependsOn:    "ReserveInventory", // Uses output from previous step
			},
			"ProcessPayment": {
				Name:         "ProcessPayment",
				Action:       makeRequest("POST", "http://localhost:3002/charge"),
				Compensation: makeRequest("POST", "http://localhost:3002/refund"),
				DependsOn:    "FraudCheck",
			},
		},
		Order: []string{"ReserveInventory", "FraudCheck", "ProcessPayment"},
	}


	app.Post("/orders", func(c *fiber.Ctx) error {
		// In a real app, you'd parse a meaningful body.
		initialPayload := []byte(`{"user_id": "user-123", "amount": 99.99}`)
		
		// Execute the saga asynchronously to not block the request.
		go func() {
			err := sagaExecutor.Execute(context.Background(), orderSaga, initialPayload)
			if err != nil {
				// The error is already logged inside the executor.
				// We could add another log here for the final failed state if desired.
				log.Printf("Saga execution failed: %v", err)
			}
		}()

		// The immediate response only acknowledges receipt.
		return c.Status(fiber.StatusAccepted).JSON(fiber.Map{
			"message": "Order processing started.",
		})
	})
	
	// Endpoint for the diagnostic UI
	app.Get("/saga/view/:txid", func(c *fiber.Ctx) error {
		// ... Query ClickHouse and return JSON ...
	})

	log.Fatal(app.Listen(":3000"))
}

5. The UnoCSS Diagnostic Dashboard

This is the final piece that makes the whole system usable. A simple HTML file served by the Go-Fiber app provides a window into our ClickHouse data store.

<!-- static/index.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Saga Transaction Inspector</title>
    <!-- UnoCSS Runtime Script -->
    <script src="https://cdn.jsdelivr.net/npm/@unocss/runtime"></script>
    <style>
        /* A little bit of custom CSS for colors that UnoCSS can pick up */
        :root {
            --un-preset-theme-colors-success: #22c55e;
            --un-preset-theme-colors-warning: #f59e0b;
            --un-preset-theme-colors-danger: #ef4444;
            --un-preset-theme-colors-info: #3b82f6;
            --un-preset-theme-colors-gray: #6b7280;
        }
    </style>
</head>
<body class="bg-gray-100 font-sans p-8">

    <div class="max-w-6xl mx-auto bg-white rounded-lg shadow-md p-6">
        <h1 class="text-2xl font-bold mb-4 text-gray-800">Saga Transaction Inspector</h1>
        <div class="flex items-center space-x-2">
            <input id="txid-input" type="text" placeholder="Enter Transaction ID" class="flex-grow p-2 border border-gray-300 rounded-md focus:ring-2 focus:ring-blue-500 focus:outline-none">
            <button id="fetch-btn" class="bg-blue-600 text-white font-semibold px-4 py-2 rounded-md hover:bg-blue-700">Fetch</button>
        </div>

        <div id="results-container" class="mt-6 hidden">
            <h2 class="text-xl font-semibold mb-2 text-gray-700">Transaction Details</h2>
            <table class="w-full text-left border-collapse">
                <thead>
                    <tr class="bg-gray-100">
                        <th class="p-2 border-b">Timestamp</th>
                        <th class="p-2 border-b">Step</th>
                        <th class="p-2 border-b">Service</th>
                        <th class="p-2 border-b">Status</th>
                        <th class="p-2 border-b">Message</th>
                    </tr>
                </thead>
                <tbody id="log-table-body">
                    <!-- Rows will be injected here by JavaScript -->
                </tbody>
            </table>
        </div>
        <div id="error-msg" class="mt-4 text-red-600"></div>
    </div>

<script>
    const statusColors = {
        'STARTED': 'bg-info/10 text-info-800',
        'COMPLETED': 'bg-success/10 text-success-800',
        'STEP_STARTED': 'bg-gray/10 text-gray-800',
        'STEP_COMPLETED': 'bg-success/10 text-success-800',
        'STEP_FAILED': 'bg-danger/10 text-danger-800',
        'COMPENSATING': 'bg-warning/10 text-warning-800',
        'COMPENSATION_STARTED': 'bg-warning/10 text-warning-800',
        'COMPENSATION_COMPLETED': 'bg-info/10 text-info-800',
        'COMPENSATION_FAILED': 'bg-danger/10 font-bold text-danger-800',
        'COMPENSATED': 'bg-info/10 text-info-800',
    };

    document.getElementById('fetch-btn').addEventListener('click', async () => {
        const txid = document.getElementById('txid-input').value.trim();
        if (!txid) return;

        const resultsContainer = document.getElementById('results-container');
        const errorMsg = document.getElementById('error-msg');
        const tbody = document.getElementById('log-table-body');
        
        tbody.innerHTML = '';
        errorMsg.textContent = '';
        resultsContainer.classList.add('hidden');

        try {
            // Replace with your actual API endpoint
            const response = await fetch(`/api/saga/trace/${txid}`);
            if (!response.ok) {
                throw new Error(`Failed to fetch data: ${response.statusText}`);
            }
            const data = await response.json();

            if (data && data.length > 0) {
                data.forEach(log => {
                    const row = document.createElement('tr');
                    const colorClass = statusColors[log.status] || 'bg-gray-100';
                    row.className = `border-b border-gray-200 ${colorClass}`;
                    row.innerHTML = `
                        <td class="p-2 text-sm">${new Date(log.timestamp).toISOString()}</td>
                        <td class="p-2 font-mono">${log.step_name}</td>
                        <td class="p-2">${log.service_name}</td>
                        <td class="p-2 font-semibold"><span class="px-2 py-1 rounded-full text-xs">${log.status}</span></td>
                        <td class="p-2 text-sm">${log.message}</td>
                    `;
                    tbody.appendChild(row);
                });
                resultsContainer.classList.remove('hidden');
            } else {
                errorMsg.textContent = 'No logs found for this Transaction ID.';
            }
        } catch (err) {
            errorMsg.textContent = `Error: ${err.message}`;
        }
    });
</script>
</body>
</html>

The result is a surprisingly powerful tool built with minimal effort. It visualizes the entire flow, making it immediately obvious where a failure occurred and how the compensation chain reacted.

sequenceDiagram
    participant C as Client
    participant O as Orchestrator (Go-Fiber)
    participant F as Fraud Service (FastAPI)
    participant P as Payment Service (Go)
    participant DB as ClickHouse

    C->>+O: POST /orders
    O->>DB: Log(SAGA_STARTED)
    O-->>-C: 202 Accepted
    
    O->>F: /check_fraud
    F->>DB: Log(STEP_STARTED)
    Note over F: Processing... Fails!
    F-->>O: 400 Bad Request
    F->>DB: Log(STEP_FAILED)

    O->>DB: Log(STEP_FAILED)
    O->>DB: Log(COMPENSATING)
    
    Note over O: FraudCheck failed, must compensate previous steps.
    Note over O: (Assuming an Inventory step came before)
    O->>I: /release_inventory
    I->>DB: Log(COMPENSATION_STARTED)
    I-->>O: 200 OK
    I->>DB: Log(COMPENSATION_COMPLETED)

    O->>DB: Log(COMPENSATION_COMPLETED)
    O->>DB: Log(COMPENSATED)

This system isn’t without its limitations. The orchestrator is currently a single point of failure; a production implementation would require clustering and a leader election mechanism. The Saga state is also transient, reconstructed entirely from logs. For very long-running Sagas, persisting the state machine to a durable store like PostgreSQL would be a necessary addition. Furthermore, the communication relies on synchronous HTTP requests. A more robust design might use a message queue like NATS or Kafka, which introduces its own set of challenges around message idempotency but decouples the services more effectively.


  TOC