Implementing a Resilient Saga Pattern for Distributed Transactions Between a Laravel API and a Ray Compute Cluster


The initial requirement seemed straightforward: a mobile application needed to upload a multi-page document, and the backend would process it through a series of computationally intensive steps—OCR, entity extraction, data validation against existing SQL records, and finally, generating a PDF report. The entire workflow had to be atomic. If any step failed, the system must roll back to its original state, leaving no partial data or orphaned files. A classic ACID transaction scenario.

Our primary mobile API backend is built on Laravel. The obvious first approach was to handle this within a single, synchronous API request. This failed immediately under load testing. A 10-page document could take over 45 seconds to process, leading to mobile client timeouts and holding a PHP-FPM worker hostage for an unacceptable duration.

The second attempt involved a standard Laravel background job using a Redis queue. This solved the timeout issue, as the API could now return a 202 Accepted response instantly. However, it introduced a new, more insidious problem: transaction management. A typical DB::transaction() closure in Laravel cannot span multiple, independent job executions. If the third step (e.g., data validation) failed after the second step (entity extraction) had already committed its results to a separate table, we were left with inconsistent data. Manually implementing cleanup logic in catch blocks became a tangled mess of conditional checks, which was brittle and unmaintainable. The core issue remained: we lacked a mechanism to enforce atomicity across a long-running, multi-stage process.

Furthermore, some of the planned steps, particularly the entity extraction and future ML-based classification models, were poor fits for the PHP ecosystem. Our data science team works exclusively in Python. This led us to a critical architectural decision point: we needed to offload the heavy computation to a dedicated Python environment but orchestrate it from our existing Laravel application, all while preserving the original requirement for transactional integrity. This is where a simple RPC call or fire-and-forget job dispatch was insufficient. We had to build a system that could manage a distributed transaction across language and process boundaries. The Saga pattern was the logical choice.

Our final architecture uses Laravel as the API endpoint and Saga Orchestrator, a PostgreSQL database as the single source of truth for both application data and saga state, RabbitMQ as the decoupled communication layer, and a Ray cluster for distributed, parallel execution of the Python-based processing steps.

The Foundation: Saga State Management in SQL

Before writing a single line of orchestration code, the source of truth must be defined. Relying on in-memory state or logs scattered across services is a recipe for disaster in a distributed system. The SQL database, with its robust ACID guarantees for single-node operations, is the perfect place to store the state of the entire distributed transaction.

We defined two core tables: sagas and saga_steps.

-- The main saga instance, representing the end-to-end business transaction.
CREATE TABLE sagas (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    status VARCHAR(50) NOT NULL DEFAULT 'PENDING', -- PENDING, EXECUTING, COMPLETED, COMPENSATING, FAILED
    payload JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_sagas_status ON sagas(status);

-- Individual steps within a saga. Each step is a local transaction.
CREATE TABLE saga_steps (
    id BIGSERIAL PRIMARY KEY,
    saga_id BIGINT NOT NULL REFERENCES sagas(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    status VARCHAR(50) NOT NULL DEFAULT 'PENDING', -- PENDING, EXECUTING, COMPLETED, FAILED, COMPENSATED
    sequence INT NOT NULL,
    job_payload JSONB NOT NULL,
    compensation_payload JSONB, -- Data needed to run the compensation
    error_message TEXT,
    created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
    UNIQUE(saga_id, sequence)
);

CREATE INDEX idx_saga_steps_saga_id_status ON saga_steps(saga_id, status);

The sagas table tracks the overall process. The saga_steps table is the critical piece, defining each local transaction in order. The sequence column is vital for ensuring ordered execution and, more importantly, reverse-ordered compensation. The compensation_payload stores the necessary information to undo a step’s action, such as the ID of a created record or the path to a generated file.

Laravel as the Saga Orchestrator

The orchestration logic resides entirely within Laravel. It’s responsible for defining the saga, starting it, and handling the compensation flow if anything goes wrong. We created a SagaOrchestrator service to encapsulate this logic.

Here is the core service implementation. A real-world project would abstract the models and repository, but this illustrates the direct interaction with the database.

app/Services/Saga/SagaOrchestrator.php

<?php

namespace App\Services\Saga;

use App\Jobs\Saga\ExecuteSagaStepJob;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Ramsey\Uuid\Uuid;
use Throwable;

class SagaOrchestrator
{
    protected int $sagaId;
    protected array $steps = [];

    public function __construct(
        protected string $sagaName,
        protected array $initialPayload = []
    ) {}

    public function addStep(string $stepName, string $queue, array $jobPayload, ?array $compensationPayload = null): self
    {
        $this->steps[] = [
            'name' => $stepName,
            'queue' => $queue, // RabbitMQ routing key
            'job_payload' => $jobPayload,
            'compensation_payload' => $compensationPayload,
        ];
        return $this;
    }

    /**
     * @throws Throwable
     */
    public function execute(): int
    {
        // This entire block MUST be in a local database transaction to ensure atomicity.
        // If we can't even create the saga definition correctly, nothing should happen.
        return DB::transaction(function () {
            $this->sagaId = DB::table('sagas')->insertGetId([
                'name' => $this->sagaName,
                'status' => 'PENDING',
                'payload' => json_encode($this->initialPayload),
                'created_at' => now(),
                'updated_at' => now(),
            ]);

            foreach ($this->steps as $index => $step) {
                DB::table('saga_steps')->insert([
                    'saga_id' => $this->sagaId,
                    'name' => $step['name'],
                    'status' => 'PENDING',
                    'sequence' => $index + 1,
                    'job_payload' => json_encode($step['job_payload']),
                    'compensation_payload' => isset($step['compensation_payload']) ? json_encode($step['compensation_payload']) : null,
                    'created_at' => now(),
                    'updated_at' => now(),
                ]);
            }

            // Mark saga as EXECUTING and dispatch the first job.
            DB::table('sagas')->where('id', $this->sagaId)->update(['status' => 'EXECUTING']);
            $this->dispatchNextStep(1);

            Log::info("Saga [{$this->sagaId}] started for [{$this->sagaName}].");

            return $this->sagaId;
        });
    }

    public function dispatchNextStep(int $sequence): void
    {
        $nextStep = DB::table('saga_steps')
            ->where('saga_id', $this->sagaId)
            ->where('sequence', $sequence)
            ->first();

        if (!$nextStep) {
            // No more steps, saga is complete.
            DB::table('sagas')->where('id', $this->sagaId)->update(['status' => 'COMPLETED']);
            Log::info("Saga [{$this->sagaId}] completed successfully.");
            return;
        }

        // A common mistake is not finding the corresponding step config.
        // We look it up based on the name stored in the database.
        $stepConfig = collect($this->steps)->firstWhere('name', $nextStep->name);
        if (!$stepConfig) {
             // This is a critical internal error.
             Log::error("Configuration for step '{$nextStep->name}' not found for saga [{$this->sagaId}]. Triggering compensation.");
             $this->startCompensation("Configuration for step '{$nextStep->name}' not found.");
             return;
        }

        $queue = $stepConfig['queue'];
        $jobPayload = json_decode($nextStep->job_payload, true);

        // This job dispatches the message to RabbitMQ.
        ExecuteSagaStepJob::dispatch($this->sagaId, $nextStep->id, $queue, $jobPayload)
            ->onQueue('saga_dispatcher'); // Use a dedicated queue for dispatching.
    }

    public function startCompensation(string $reason): void
    {
        // ... Compensation logic will be detailed later
    }
}

The controller initiating the process is now clean and simple. It defines the workflow and fires it off.

app/Http/Controllers/DocumentProcessingController.php

<?php

namespace App\Http\Controllers;

use App\Services\Saga\SagaOrchestrator;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Throwable;

class DocumentProcessingController extends Controller
{
    public function process(Request $request)
    {
        $request->validate(['document_path' => 'required|string']);
        $documentPath = $request->input('document_path');

        try {
            // Define the saga and its steps
            $orchestrator = new SagaOrchestrator('document_processing', ['document_path' => $documentPath]);

            $sagaId = $orchestrator
                ->addStep(
                    'validate_document',
                    'q_doc_validation', // Queue for the validation worker
                    ['path' => $documentPath],
                    // No compensation needed if validation is the first step and fails
                )
                ->addStep(
                    'extract_entities',
                    'q_doc_extraction', // Queue for the Python entity extraction worker
                    ['path' => $documentPath],
                    ['compensation_event' => 'delete_extracted_data', 'document_path' => $documentPath]
                )
                ->addStep(
                    'generate_report',
                    'q_doc_reporting', // Queue for the reporting worker
                    ['path' => $documentPath],
                    ['compensation_event' => 'delete_report_file', 'document_path' => $documentPath]
                )
                ->execute();
            
            return response()->json(['message' => 'Processing started.', 'saga_id' => $sagaId], 202);

        } catch (Throwable $e) {
            Log::critical('Failed to initiate saga for document processing.', ['error' => $e->getMessage()]);
            return response()->json(['error' => 'Could not start processing. Please try again later.'], 500);
        }
    }
}

The Communication Fabric: RabbitMQ and a Unified Message Format

Decoupling is non-negotiable. A direct HTTP call from a Python worker back to the Laravel API to report status creates a brittle, synchronous dependency. We use RabbitMQ for all communication between the orchestrator and the workers. A consistent message format is essential for interoperability.

All messages published to worker queues follow this JSON structure:

{
  "saga_id": 123,
  "step_id": 456,
  "task": "extract_entities",
  "payload": {
    "path": "/path/to/document.pdf"
  }
}

And all workers, regardless of language, must publish a response to a standardized q_saga_responses queue with this format:

{
  "saga_id": 123,
  "step_id": 456,
  "status": "COMPLETED", // or "FAILED"
  "output": { 
    "entities_found": 57 
  },
  "error": null // or "Error message if failed"
}

This bidirectional communication allows the Laravel application to listen for outcomes and drive the saga forward or initiate compensation.

The Python/Ray Worker Implementation

This is where the heavy lifting happens. We use Ray’s actor model to manage stateful or resource-intensive tasks. A master consumer process listens to RabbitMQ, receives a job, and dispatches it to a specific Ray actor.

Here’s a simplified representation of the Python worker. The actual implementation would include robust connection handling for RabbitMQ and the database, plus more sophisticated error management.

workers/ray_document_processor.py

import pika
import json
import ray
import os
import psycopg2
import time
from psycopg2.extras import Json
from contextlib import contextmanager

# --- Database Connection Management ---
# In production, use a proper connection pool like SQLAlchemy's.
DB_CONN_STR = os.environ.get("DATABASE_URL")

@contextmanager
def get_db_connection():
    conn = psycopg2.connect(DB_CONN_STR)
    try:
        yield conn
    finally:
        conn.close()

def update_step_status(step_id, status, error=None):
    with get_db_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                UPDATE saga_steps 
                SET status = %s, error_message = %s, updated_at = NOW()
                WHERE id = %s
                """,
                (status, error, step_id)
            )
            conn.commit()

def publish_to_next_step_queue(saga_id, next_sequence):
    # This function would publish a message to a queue that a Laravel
    # listener consumes to trigger dispatchNextStep().
    # For simplicity here, we assume a direct call or another mechanism.
    # In a real system, it's a message back to the orchestrator.
    print(f"SAGA[{saga_id}]: Signaling orchestrator to proceed to sequence {next_sequence}")
    # In a real implementation, this would publish to a `q_saga_orchestrator` queue.
    # The Laravel orchestrator would have a listener on this queue.


# --- Ray Actors for each task ---
@ray.remote
class EntityExtractor:
    def __init__(self):
        # Load models or other expensive resources here.
        # This is a key advantage of Ray Actors.
        time.sleep(2) # Simulate model loading
        print("EntityExtractor actor initialized.")

    def process(self, document_path):
        print(f"Extracting entities from {document_path}...")
        # Simulate heavy processing
        time.sleep(5)
        
        # A common pitfall is introducing a random failure.
        # This helps test the compensation logic.
        if "fail" in document_path:
            raise ValueError("Failed to extract entities due to invalid format.")
        
        print("Entity extraction successful.")
        return {"entities_found": 150, "pages_processed": 10}

# --- Main Worker Logic ---
class SagaWorker:
    def __init__(self):
        ray.init(address='auto', ignore_reinit_error=True)
        # Actor handles are created once and reused.
        self.entity_extractor = EntityExtractor.remote()

        # RabbitMQ setup
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='q_doc_extraction', durable=True)
        # A queue for the orchestrator to listen to for results
        self.channel.queue_declare(queue='q_saga_responses', durable=True)


    def callback(self, ch, method, properties, body):
        try:
            msg = json.loads(body)
            saga_id = msg['saga_id']
            step_id = msg['step_id']
            payload = msg['payload']

            print(f"Received job for SAGA[{saga_id}] STEP[{step_id}]")
            update_step_status(step_id, 'EXECUTING')

            # --- Task routing based on queue name or task name ---
            # This is a simplistic router. A real system might use a registry.
            if method.routing_key == 'q_doc_extraction':
                result_ref = self.entity_extractor.process.remote(payload['path'])
                output = ray.get(result_ref)
                
                # Success path
                update_step_status(step_id, 'COMPLETED')
                self.publish_response(saga_id, step_id, "COMPLETED", output)
            
            else:
                raise NotImplementedError(f"No worker for task on queue {method.routing_key}")

        except Exception as e:
            # Failure path
            print(f"ERROR in SAGA[{saga_id}] STEP[{step_id}]: {e}")
            error_msg = str(e)
            update_step_status(step_id, 'FAILED', error_msg)
            self.publish_response(saga_id, step_id, "FAILED", error=error_msg)
        finally:
            ch.basic_ack(delivery_tag=method.delivery_tag)

    def publish_response(self, saga_id, step_id, status, output=None, error=None):
        response_body = json.dumps({
            "saga_id": saga_id,
            "step_id": step_id,
            "status": status,
            "output": output or {},
            "error": error
        })
        self.channel.basic_publish(
            exchange='',
            routing_key='q_saga_responses',
            body=response_body,
            properties=pika.BasicProperties(delivery_mode=2) # make message persistent
        )
        print(f"Published response for SAGA[{saga_id}] STEP[{step_id}] with status {status}")

    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(queue='q_doc_extraction', on_message_callback=self.callback)
        print('Waiting for messages on q_doc_extraction. To exit press CTRL+C')
        self.channel.start_consuming()

if __name__ == '__main__':
    worker = SagaWorker()
    worker.start()

The Critical Path: Compensation

A saga is only as good as its compensation logic. When a worker reports a failure, the orchestrator must take over. We implement this with another Laravel queue listener that consumes from the q_saga_responses queue.

app/Listeners/Saga/ProcessSagaResponseListener.php

<?php

namespace App\Listeners\Saga;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use App\Services\Saga\SagaOrchestrator; // To call dispatchNextStep

class ProcessSagaResponseListener implements ShouldQueue
{
    public $queue = 'saga_responses'; // Listen on the response queue

    public function handle($event) // Assume event payload is the decoded JSON message
    {
        $sagaId = $event['saga_id'];
        $stepId = $event['step_id'];
        $status = $event['status'];

        $step = DB::table('saga_steps')->where('id', $stepId)->first();
        if (!$step) {
            Log::error("Received response for non-existent step ID {$stepId}");
            return;
        }

        if ($status === 'COMPLETED') {
            // Logic to find saga name and initial payload to re-instantiate orchestrator
            $saga = DB::table('sagas')->find($sagaId);
            $orchestrator = new SagaOrchestrator($saga->name, json_decode($saga->payload, true));
            $orchestrator->dispatchNextStep($step->sequence + 1);

        } elseif ($status === 'FAILED') {
            $this->startCompensation($sagaId, $step->sequence);
        }
    }

    private function startCompensation(int $sagaId, int $failedStepSequence): void
    {
        DB::table('sagas')->where('id', $sagaId)->update(['status' => 'COMPENSATING']);
        Log::warning("Saga [{$sagaId}] failed at step sequence {$failedStepSequence}. Starting compensation.");

        $completedSteps = DB::table('saga_steps')
            ->where('saga_id', $sagaId)
            ->where('status', 'COMPLETED')
            ->where('sequence', '<', $failedStepSequence)
            ->orderBy('sequence', 'desc') // CRITICAL: Compensate in reverse order
            ->get();

        foreach ($completedSteps as $step) {
            $compensationPayload = json_decode($step->compensation_payload, true);
            if (empty($compensationPayload) || empty($compensationPayload['compensation_event'])) {
                Log::warning("No compensation logic defined for step '{$step->name}' [{$step->id}]. Skipping.");
                continue;
            }

            // Dispatch a job to handle the specific compensation task.
            // This decouples the compensation logic.
            // e.g., Fire a `DeleteExtractedDataEvent` or dispatch `DeleteReportFileJob`.
            // The job would contain the logic to undo the action.
            
            // For example:
            // event($compensationPayload['compensation_event'], $compensationPayload);

            DB::table('saga_steps')->where('id', $step->id)->update(['status' => 'COMPENSATED']);
            Log::info("Step '{$step->name}' [{$step->id}] for saga [{$sagaId}] has been compensated.");
        }

        DB::table('sagas')->where('id', $sagaId)->update(['status' => 'FAILED']);
        Log::error("Saga [{$sagaId}] compensation finished. Saga marked as FAILED.");
    }
}

This architecture is visualized below.

sequenceDiagram
    participant Mobile Client
    participant Laravel API
    participant RabbitMQ
    participant Ray Worker
    participant SQL Database

    Mobile Client->>+Laravel API: POST /process-document
    Laravel API->>+SQL Database: CREATE saga, CREATE steps (status: PENDING)
    SQL Database-->>-Laravel API: Return saga_id
    Laravel API->>Laravel API: Update saga status to EXECUTING
    Laravel API->>RabbitMQ: Dispatch job for Step 1 (Validate)
    Laravel API-->>-Mobile Client: 202 Accepted { saga_id: 123 }
    
    Ray Worker->>RabbitMQ: Consume Step 1 job
    Ray Worker->>+SQL Database: UPDATE step 1 status to EXECUTING
    SQL Database-->>-Ray Worker: OK
    Note right of Ray Worker: Perform validation...
    Ray Worker->>+SQL Database: UPDATE step 1 status to COMPLETED
    SQL Database-->>-Ray Worker: OK
    Ray Worker->>RabbitMQ: Publish response { status: COMPLETED }
    
    Laravel API->>RabbitMQ: Consume Step 1 response
    Laravel API->>Laravel API: Call dispatchNextStep(2)
    Laravel API->>RabbitMQ: Dispatch job for Step 2 (Extract)

    Ray Worker->>RabbitMQ: Consume Step 2 job
    Ray Worker->>+SQL Database: UPDATE step 2 status to EXECUTING
    SQL Database-->>-Ray Worker: OK
    Note right of Ray Worker: Perform entity extraction...
    Note right of Ray Worker: CRITICAL FAILURE!
    Ray Worker->>+SQL Database: UPDATE step 2 status to FAILED
    SQL Database-->>-Ray Worker: OK
    Ray Worker->>RabbitMQ: Publish response { status: FAILED }
    
    Laravel API->>RabbitMQ: Consume Step 2 response (FAILED)
    Laravel API->>Laravel API: Initiate compensation flow
    Laravel API->>+SQL Database: UPDATE saga status to COMPENSATING
    SQL Database-->>-Laravel API: OK
    Laravel API->>+SQL Database: Find completed steps before failed one (Step 1)
    SQL Database-->>-Laravel API: Return Step 1 details
    Note left of Laravel API: Execute compensation for Step 1
    Laravel API->>+SQL Database: UPDATE step 1 status to COMPENSATED
    SQL Database-->>-Laravel API: OK
    Laravel API->>+SQL Database: UPDATE saga status to FAILED
    SQL Database-->>-Laravel API: OK

The primary trade-off of this pattern is complexity. Debugging a failed saga requires careful inspection of the saga tables and distributed logs. All compensating actions must be designed to be idempotent; running a “delete file” compensation twice should not cause an error on the second attempt. Furthermore, this approach achieves semantic atomicity, not the strict isolation level of a database transaction. A client polling for status might momentarily see the result of an intermediate step before it gets compensated. This level of eventual consistency must be acceptable for the business requirement. For our use case, where the final report is the only artifact that matters to the end-user, this was a perfectly acceptable compromise for achieving scalability and resilience.


  TOC