Implementing a Resilient Saga Orchestrator Using gRPC and an ORM to Enforce Service Decoupling


Our sprint velocity was grinding to a halt. Every new feature, no matter how small, became a cross-team epic involving coordinated deployments and late-night manual data reconciliation. The root cause was a classic architectural mistake: our so-called “microservices” were just distributed monoliths, all tethered to a single, monstrous PostgreSQL database. A Scrum-driven mandate to deliver independent, vertical feature slices was failing because our architecture couldn’t support it. The decision was made to enforce true service decoupling, giving each service its own private database. This solved one problem but immediately created another, much harder one: maintaining data consistency across atomic business operations that now spanned multiple services.

Our initial attempts with synchronous, chained API calls were a disaster. A failure in the third service of a four-service chain left data in an inconsistent state, requiring complex, bespoke cleanup scripts. This was untenable. We needed a robust pattern for managing distributed transactions. After a heated debate, we discarded Two-Phase Commit (2PC) due to its reliance on locks and its negative impact on availability—a non-starter for our requirements. We settled on the Saga pattern, specifically an orchestration-based approach. The plan was to build a central orchestrator responsible for sequencing the steps of a business transaction and handling any necessary rollbacks.

The Core Architectural Tenets

Before writing a single line of code, we laid out the non-negotiable principles for this new Saga orchestrator.

  1. State Must Be Durably Persisted: The orchestrator itself cannot be stateless. If it crashes mid-saga, it must be able to resume from where it left off upon restart. This immediately pointed towards a relational database for storing the Saga’s state machine.
  2. Communication Must Be Efficient and Strongly-Typed: The communication between the orchestrator and participating services is the system’s backbone. REST/JSON felt too loose and slow. We chose gRPC for its performance and the strict contracts enforced by Protocol Buffers.
  3. Participants Must Be Idempotent: Network failures are a given. The orchestrator will retry failed steps. Therefore, each service participating in a Saga must be ableto handle the same request multiple times without causing duplicate data or side effects.
  4. Failure is a First-Class Citizen: The system must be resilient to partial failures. If a participating service is down, it shouldn’t cascade and take down the entire system. This meant building resilience patterns like circuit breakers directly into our gRPC clients.

For implementation, we chose Go for its concurrency primitives and performance. We selected GORM as our ORM to abstract away the boilerplate SQL for state management, letting us focus on the core orchestration logic.

Modeling the Saga State Machine

The first step was modeling the Saga and its steps in the database. A Saga instance represents a single end-to-end business transaction, while a Saga Step represents a single local transaction within a participating service.

// models/saga.go
package models

import (
	"time"
	"gorm.io/gorm"
)

type SagaState string

const (
	StatePending      SagaState = "PENDING"
	StateExecuting    SagaState = "EXECUTING"
	StateCompleted    SagaState = "COMPLETED"
	StateCompensating SagaState = "COMPENSATING"
	StateFailed       SagaState = "FAILED"
)

// Saga represents the entire distributed transaction.
type Saga struct {
	gorm.Model
	Name              string      `gorm:"type:varchar(255);not null"`
	TransactionID     string      `gorm:"type:varchar(255);uniqueIndex;not null"`
	State             SagaState   `gorm:"type:varchar(50);not null;index"`
	CurrentStep       int         `gorm:"not null;default:0"`
	Payload           []byte      `gorm:"type:bytea"` // Store initial payload for the saga
	Steps             []SagaStep  `gorm:"foreignKey:SagaID"`
	CompensationError string      `gorm:"type:text"`
	FinishedAt        *time.Time
}

// SagaStep represents a single step (local transaction) in the saga.
type SagaStep struct {
	gorm.Model
	SagaID         uint      `gorm:"not null;index"`
	StepNumber     int       `gorm:"not null"`
	Name           string    `gorm:"type:varchar(255);not null"`
	State          SagaState `gorm:"type:varchar(50);not null"`
	ServiceAddress string    `gorm:"type:varchar(255);not null"` // gRPC address of the participant
	Payload        []byte    `gorm:"type:bytea"` // Input for this specific step
	Response       []byte    `gorm:"type:bytea"` // Output from this step
	ErrorMessage   string    `gorm:"type:text"`
}

This GORM model is the heart of the orchestrator. The Saga table tracks the overall progress, while SagaStep defines the sequence of operations. The State field is critical for determining whether to move forward (EXECUTING) or backward (COMPENSATING). The TransactionID is a business-level identifier we can use for ensuring idempotency.

Defining the Universal gRPC Contract

A key design decision was to avoid creating a unique gRPC service for every type of Saga. Instead, we defined a generic interface that all participating services must implement. This makes the orchestrator agnostic to the business logic of the steps it’s executing.

// protos/saga_participant.proto
syntax = "proto3";

package participant;

option go_package = "github.com/your-org/saga-orchestrator/protos/participant";

// A generic service that all Saga participants must implement.
service SagaParticipant {
  // Executes the forward transaction step.
  rpc Execute(SagaStepRequest) returns (SagaStepResponse);
  // Executes the compensating transaction step.
  rpc Compensate(SagaStepRequest) returns (SagaStepResponse);
}

message SagaStepRequest {
  // A unique ID for this entire transaction, used for idempotency.
  string transaction_id = 1;
  // A unique ID for this specific step execution attempt.
  string step_id = 2;
  // The payload required for this step, encoded as bytes (e.g., JSON).
  bytes payload = 3;
}

message SagaStepResponse {
  bool success = 1;
  // The result of the step, encoded as bytes.
  bytes payload = 2;
  // A descriptive error message if success is false.
  string error_message = 3;
}

Every service involved in a Saga—Order Service, Payment Service, Inventory Service—will implement this SagaParticipant service. The orchestrator simply needs to know the address of the service and the payload for each step. The business logic is encapsulated within the participant.

The Orchestrator’s Engine Room

The orchestrator’s core logic is a processing loop. It continuously polls the database for Sagas that are in a runnable state (PENDING or EXECUTING) and processes them one step at a time.

Here is a simplified but functional representation of the orchestrator’s main processing function.

// orchestrator/processor.go
package orchestrator

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/your-org/saga-orchestrator/models"
	"github.com/your-org/saga-orchestrator/protos/participant"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"gorm.io/gorm"
)

type Processor struct {
	db *gorm.DB
	// A map to cache gRPC connections. In a real system, this would be more sophisticated.
	grpcClients map[string]participant.SagaParticipantClient
}

func NewProcessor(db *gorm.DB) *Processor {
	return &Processor{
		db:          db,
		grpcClients: make(map[string]participant.SagaParticipantClient),
	}
}

// getClient establishes or retrieves a cached gRPC client connection.
func (p *Processor) getClient(address string) (participant.SagaParticipantClient, error) {
	if client, ok := p.grpcClients[address]; ok {
		return client, nil
	}

	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return nil, fmt.Errorf("failed to connect to gRPC service at %s: %w", address, err)
	}

	client := participant.NewSagaParticipantClient(conn)
	p.grpcClients[address] = client
	return client, nil
}

func (p *Processor) ProcessSaga(saga *models.Saga) {
	log.Printf("Processing Saga TransactionID: %s, State: %s, CurrentStep: %d", saga.TransactionID, saga.State, saga.CurrentStep)

	if saga.State == models.StateExecuting {
		p.executeForward(saga)
	} else if saga.State == models.StateCompensating {
		p.executeCompensation(saga)
	}
}

func (p *Processor) executeForward(saga *models.Saga) {
	// A pitfall here is not pre-loading the steps. GORM's Preload is essential for performance.
	if len(saga.Steps) == 0 {
		p.db.Model(saga).Association("Steps").Find(&saga.Steps)
	}

	if saga.CurrentStep >= len(saga.Steps) {
		// All steps completed successfully
		saga.State = models.StateCompleted
		now := time.Now()
		saga.FinishedAt = &now
		if err := p.db.Save(saga).Error; err != nil {
			log.Printf("Error marking saga %s as completed: %v", saga.TransactionID, err)
		}
		log.Printf("Saga %s completed successfully.", saga.TransactionID)
		return
	}

	step := saga.Steps[saga.CurrentStep]
	step.State = models.StateExecuting
	p.db.Save(&step)

	client, err := p.getClient(step.ServiceAddress)
	if err != nil {
		log.Printf("FATAL: Cannot get gRPC client for step %d of saga %s: %v", step.StepNumber, saga.TransactionID, err)
		// This is a configuration error, we can't proceed or compensate.
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	req := &participant.SagaStepRequest{
		TransactionId: saga.TransactionID,
		StepId:        fmt.Sprintf("%s-%d", saga.TransactionID, step.ID),
		Payload:       step.Payload,
	}

	resp, err := client.Execute(ctx, req)

	if err != nil || !resp.Success {
		var errMsg string
		if err != nil {
			errMsg = err.Error()
		} else {
			errMsg = resp.ErrorMessage
		}
		
		log.Printf("Step %d for saga %s failed: %s", step.StepNumber, saga.TransactionID, errMsg)
		step.State = models.StateFailed
		step.ErrorMessage = errMsg
		p.db.Save(&step)

		// The critical transition: flip the saga to compensating state.
		saga.State = models.StateCompensating
		// The current step has failed, so we start compensating from the one *before* it.
		// Note: saga.CurrentStep has not been incremented yet.
		p.db.Save(saga) 
		p.executeCompensation(saga) // Immediately start compensation
		return
	}

	// Step succeeded
	log.Printf("Step %d for saga %s succeeded.", step.StepNumber, saga.TransactionID)
	step.State = models.StateCompleted
	step.Response = resp.Payload
	p.db.Save(&step)

	// Increment step and save saga state to proceed to the next step in the next processing cycle.
	saga.CurrentStep++
	p.db.Save(saga)
}

The error handling is paramount. If client.Execute fails, the Saga’s state is immediately flipped to COMPENSATING, and the compensation logic is triggered. The state of every step is meticulously updated in the database. This ensures that if the orchestrator crashes and restarts, it knows exactly where to resume.

The Compensation Flow: Unwinding the Work

Compensation is the most difficult part of a Saga to get right. It’s not a true rollback; it’s a series of new transactions that semantically undo the work of the previous steps. The logic must iterate backward from the last successfully completed step.

// orchestrator/processor.go (continued)
func (p *Processor) executeCompensation(saga *models.Saga) {
	if len(saga.Steps) == 0 {
		p.db.Model(saga).Association("Steps").Find(&saga.Steps)
	}

	// We compensate from the last completed step, which is `saga.CurrentStep - 1`.
	// The step at `saga.CurrentStep` is the one that failed.
	stepToCompensateIndex := saga.CurrentStep - 1

	if stepToCompensateIndex < 0 {
		// Nothing to compensate, the first step failed. Mark saga as failed.
		saga.State = models.StateFailed
		now := time.Now()
		saga.FinishedAt = &now
		if err := p.db.Save(saga).Error; err != nil {
			log.Printf("Error marking saga %s as failed: %v", saga.TransactionID, err)
		}
		log.Printf("Saga %s failed with no steps to compensate.", saga.TransactionID)
		return
	}

	step := saga.Steps[stepToCompensateIndex]

	// Skip if already compensated or in a terminal state
	if step.State != models.StateCompleted {
		log.Printf("Skipping compensation for step %d in saga %s, state is %s", step.StepNumber, saga.TransactionID, step.State)
		saga.CurrentStep-- // Move to the next step to compensate
		p.db.Save(saga)
		return
	}

	step.State = models.StateCompensating
	p.db.Save(&step)

	client, err := p.getClient(step.ServiceAddress)
	if err != nil {
		log.Printf("FATAL: Cannot get gRPC client for compensating step %d of saga %s: %v", step.StepNumber, saga.TransactionID, err)
		saga.CompensationError = fmt.Sprintf("gRPC client failure for step %d: %v", step.StepNumber, err)
		p.db.Save(saga)
		// We are stuck. This requires manual intervention.
		return
	}
	
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	req := &participant.SagaStepRequest{
		TransactionId: saga.TransactionID,
		// Using the original step response as the payload for compensation is a common pattern.
		Payload:       step.Response, 
		StepId:        fmt.Sprintf("%s-%d-comp", saga.TransactionID, step.ID),
	}

	resp, err := client.Compensate(ctx, req)
	
	// A critical challenge: What happens if compensation fails?
	// This is a "pivot" transaction. In our model, we halt and require manual intervention.
	if err != nil || !resp.Success {
		var errMsg string
		if err != nil {
			errMsg = err.Error()
		} else {
			errMsg = resp.ErrorMessage
		}

		log.Printf("CRITICAL: Compensation for step %d of saga %s failed: %s. Halting.", step.StepNumber, saga.TransactionID, errMsg)
		saga.CompensationError = fmt.Sprintf("Compensation for step %d failed: %s", step.StepNumber, errMsg)
		p.db.Save(saga)
		// We stop processing this saga automatically.
		return
	}

	log.Printf("Compensation for step %d of saga %s succeeded.", step.StepNumber, saga.TransactionID)
	step.State = models.StateCompensating // Should be 'Compensated' in a more granular model. Let's use compensating to indicate it's handled.
	p.db.Save(&step)
	
	saga.CurrentStep--
	p.db.Save(saga)
}

The most challenging scenario is a failure during compensation. There is no easy answer. Our pragmatic choice was to halt automatic processing for that Saga instance, log a critical error, and flag it for manual review. More complex systems might implement retry logic for compensation, but this risks creating poison messages if the failure is non-transient.

A Concrete Participant: The Order Service

Here’s how a participant service, like an OrderService, would implement the generic gRPC interface. It uses its own GORM instance to talk to its private database. Idempotency is handled by checking for the transaction_id before creating a new order record.

// services/order/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net"

	"github.com/google/uuid"
	"github.com/your-org/saga-orchestrator/protos/participant"
	"google.golang.org/grpc"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

// Local DB model for the Order Service
type Order struct {
	gorm.Model
	SagaTransactionID string `gorm:"uniqueIndex"`
	ProductID         string
	UserID            string
	Status            string // e.g., PENDING_PAYMENT, CREATED, CANCELLED
}

type OrderPayload struct {
	ProductID string `json:"product_id"`
	UserID    string `json:"user_id"`
}

type server struct {
	participant.UnimplementedSagaParticipantServer
	db *gorm.DB
}

// Execute creates the order in a PENDING state.
func (s *server) Execute(ctx context.Context, req *participant.SagaStepRequest) (*participant.SagaStepResponse, error) {
	var payload OrderPayload
	if err := json.Unmarshal(req.Payload, &payload); err != nil {
		return &participant.SagaStepResponse{Success: false, ErrorMessage: "invalid payload"}, nil
	}
	
	// Idempotency Check
	var existingOrder Order
	res := s.db.First(&existingOrder, "saga_transaction_id = ?", req.TransactionId)
	if res.Error == nil {
		log.Printf("Idempotency key hit: Order for transaction %s already exists.", req.TransactionId)
		// Return success because the desired state is already achieved.
		return &participant.SagaStepResponse{Success: true}, nil
	}
	
	newOrder := Order{
		SagaTransactionID: req.TransactionId,
		ProductID:         payload.ProductID,
		UserID:            payload.UserID,
		Status:            "PENDING_PAYMENT",
	}

	if err := s.db.Create(&newOrder).Error; err != nil {
		return &participant.SagaStepResponse{Success: false, ErrorMessage: err.Error()}, nil
	}

	log.Printf("Created order for transaction %s", req.TransactionId)
	return &participant.SagaStepResponse{Success: true}, nil
}

// Compensate cancels the order.
func (s *server) Compensate(ctx context.Context, req *participant.SagaStepRequest) (*participant.SagaStepResponse, error) {
	var order Order
	res := s.db.First(&order, "saga_transaction_id = ?", req.TransactionId)
	if res.Error != nil {
		// If the order doesn't exist, compensation is considered successful.
		log.Printf("Compensation: Order for transaction %s not found, assuming success.", req.TransactionId)
		return &participant.SagaStepResponse{Success: true}, nil
	}

	order.Status = "CANCELLED"
	if err := s.db.Save(&order).Error; err != nil {
		return &participant.SagaStepResponse{Success: false, ErrorMessage: err.Error()}, nil
	}

	log.Printf("Cancelled order for transaction %s", req.TransactionId)
	return &participant.SagaStepResponse{Success: true}, nil
}

// main function to setup DB and start gRPC server...

Adding a Layer of Resilience: The Circuit Breaker

Our system now handles logical failures, but it’s still naive about network health. If the Inventory Service is down and the orchestrator continuously retries, it wastes resources and can cause cascading failures. We needed a circuit breaker.

We decided to wrap our gRPC client logic in the orchestrator with a circuit breaker library. The gobreaker library is a simple and effective choice.

graph TD
    A[Orchestrator] -- gRPC Call --> B{Circuit Breaker};
    B -- Closed --> C[Execute RPC on Service];
    C -- Success --> A;
    C -- Failure --> B;
    B -- Open --> D[Fail Fast];
    D -- Immediate Error --> A;
    B -- Half-Open --> E[Allow One Trial RPC];
    E -- Success --> B;
    E -- Failure --> B;

Here’s how we can modify the orchestrator’s client getter to incorporate it.

// orchestrator/resilient_client.go
package orchestrator

import (
	"fmt"
	"time"
	"sync"
	"github.com/sony/gobreaker"
	"github.com/your-org/saga-orchestrator/protos/participant"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

type ResilientClient struct {
	client participant.SagaParticipantClient
	cb     *gobreaker.CircuitBreaker
}

type ClientFactory struct {
	clients map[string]*ResilientClient
	mu      sync.Mutex
}

func NewClientFactory() *ClientFactory {
	return &ClientFactory{
		clients: make(map[string]*ResilientClient),
	}
}

func (f *ClientFactory) GetClient(address string) (*ResilientClient, error) {
	f.mu.Lock()
	defer f.mu.Unlock()

	if client, ok := f.clients[address]; ok {
		return client, nil
	}
	
	conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return nil, fmt.Errorf("failed to connect to gRPC service at %s: %w", address, err)
	}

	st := gobreaker.Settings{
		Name:    address,
		MaxRequests: 1, // Becomes half-open after 1 request.
		Interval:    0, // Clears counts on state change, not on interval.
		Timeout:     30 * time.Second, // Timeout for open state.
		ReadyToTrip: func(counts gobreaker.Counts) bool {
			// Trip if we have 5 consecutive failures.
			return counts.ConsecutiveFailures > 5
		},
	}
	
	cb := gobreaker.NewCircuitBreaker(st)
	client := NewResilientClient(participant.NewSagaParticipantClient(conn), cb)
	f.clients[address] = client
	
	return client, nil
}

func NewResilientClient(client participant.SagaParticipantClient, cb *gobreaker.CircuitBreaker) *ResilientClient {
    return &ResilientClient{client: client, cb: cb}
}

// Execute wraps the gRPC call with the circuit breaker logic.
func (rc *ResilientClient) Execute(ctx context.Context, req *participant.SagaStepRequest) (*participant.SagaStepResponse, error) {
	body := func() (interface{}, error) {
		return rc.client.Execute(ctx, req)
	}

	res, err := rc.cb.Execute(body)
	if err != nil {
		return nil, err
	}
	return res.(*participant.SagaStepResponse), nil
}

// Compensate would have a similar wrapper.

Now, in the Processor, instead of using a simple map of clients, we use the ClientFactory. If the Inventory Service starts failing, after 5 consecutive errors, the circuit breaker will “open.” Subsequent calls from the orchestrator to that service will fail instantly for 30 seconds without ever hitting the network, protecting the system and giving the downstream service time to recover.

This entire effort, from concept to a resilient prototype, was a direct result of our Scrum team hitting a wall. The need for independent delivery forced an architectural reckoning. The solution is complex, but it provides a clear, reusable, and observable pattern for managing distributed transactions, finally unblocking our team’s velocity.

The current orchestrator design, however, introduces a potential single point of failure and a performance bottleneck. The orchestrator process itself is a singleton; if it’s down, no new Sagas can be processed. A future iteration would involve making the orchestrator horizontally scalable and highly available, likely using a leader election mechanism (via etcd or Zookeeper) to ensure only one instance is actively processing Sagas at any given time. Furthermore, writing to the database after every single step execution introduces latency. For Sagas where extreme performance is required over absolute durability, we could explore batching state updates or using an in-memory store with a write-ahead log.


  TOC