Achieving ACID-like Guarantees in Serverless Workflows with Apache Pulsar Transactions and AWS Lambda


The mandate was clear: build a new order processing backend using a serverless architecture. The business logic, however, was anything but simple. A single “Create Order” request required a cascade of operations across three distinct domains: reserving inventory, processing payment, and scheduling shipping. Each was handled by its own microservice, deployed as an AWS Lambda function. The core technical pain point emerged immediately: how do we guarantee atomicity across these ephemeral, stateless functions? If we successfully reserve inventory but the payment processing fails, we can’t leave that inventory in a locked state, slowly bleeding potential revenue. This is the classic distributed transaction problem, amplified by the constraints of a serverless environment.

A naive approach of chaining API calls from one Lambda to the next is a recipe for disaster in a production system. It’s brittle, creates tight coupling, and offers no clean way to handle partial failures. We needed a robust orchestration pattern. Our team was already leveraging Apache Pulsar for other event-streaming workloads, so we investigated its capabilities beyond simple pub-sub. The discovery of its built-in Transactional API was the turning point. Unlike client-side Saga libraries, Pulsar provides a server-side transaction coordinator (TC), allowing us to use the message broker itself as the durable, consistent state machine for our workflow. This promised a clean separation of concerns: Lambda functions would execute business logic, and Pulsar would guarantee the atomicity of the entire multi-step process.

The architectural decision was to implement the Saga pattern, where a long-lived transaction is composed of a series of local transactions. The Pulsar transaction would encapsulate the “forward” path of the Saga. If all steps succeed, we commit the Pulsar transaction, making all produced messages visible to downstream consumers at once. If any step fails, we abort the transaction, effectively discarding all intermediate messages. This provides the “A” (Atomicity) in ACID. Compensating transactions, to undo completed local transactions, would be triggered outside the initial Pulsar transaction upon failure.

Here is the high-level flow we designed for a successful order creation:

sequenceDiagram
    participant Client
    participant Orchestrator (Lambda)
    participant Pulsar TC
    participant Inventory Svc (Lambda)
    participant Payment Svc (Lambda)
    participant Shipping Svc (Lambda)

    Client->>Orchestrator: POST /orders
    Orchestrator->>Pulsar TC: Begin Transaction (txnId)
    Pulsar TC-->>Orchestrator: Transaction Started
    
    Orchestrator->>Pulsar TC: Produce(topic='reserve-inventory', msg, txnId)
    
    Pulsar TC-->>Inventory Svc: Consume(topic='reserve-inventory')
    Inventory Svc->>Inventory Svc: Local DB Transaction (Reserve)
    Inventory Svc->>Pulsar TC: Produce(topic='inventory-reserved', msg, txnId)
    Inventory Svc->>Pulsar TC: ACK(msg)
    
    Pulsar TC-->>Orchestrator: Consume(topic='inventory-reserved')
    Orchestrator->>Pulsar TC: ACK(msg)
    Orchestrator->>Pulsar TC: Produce(topic='process-payment', msg, txnId)
    
    Pulsar TC-->>Payment Svc: Consume(topic='process-payment')
    Payment Svc->>Payment Svc: Local DB Transaction (Charge)
    Payment Svc->>Pulsar TC: Produce(topic='payment-processed', msg, txnId)
    Payment S### The Orchestrator Lambda: Core of the Saga

The primary responsibility of the Saga lies with the orchestrator. This Lambda function initiates the Pulsar transaction, sends the initial command, and listens for outcomes from participant services to drive the workflow forward or initiate a rollback. In a real-world project, the code for this needs to be defensively programmed, accounting for timeouts, exceptions, and idempotency.

The following Python code demonstrates the orchestrator's handler. It uses the official Pulsar Python client. Key aspects to note are the explicit transaction lifecycle management (`begin()`, `commit()`, `abort()`) and the use of environment variables for configuration, which is standard practice for Lambda.

# orchestrator_lambda/handler.py
import pulsar
import os
import json
import logging
import uuid
from typing import Dict, Any

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# --- Configuration loaded from environment variables ---
PULSAR_SERVICE_URL = os.environ.get('PULSAR_SERVICE_URL')
PULSAR_TOKEN = os.environ.get('PULSAR_TOKEN')

# Topics used in the Saga
TOPIC_RESERVE_INVENTORY = os.environ.get('TOPIC_RESERVE_INVENTORY')
TOPIC_PROCESS_PAYMENT = os.environ.get('TOPIC_PROCESS_PAYMENT')
TOPIC_SCHEDULE_SHIPPING = os.environ.get('TOPIC_SCHEDULE_SHIPPING')

# Reply topics to listen for results from participants
TOPIC_INVENTORY_RESERVED = os.environ.get('TOPIC_INVENTORY_RESERVED')
TOPIC_PAYMENT_PROCESSED = os.environ.get('TOPIC_PAYMENT_PROCESSED')
TOPIC_SHIPPING_SCHEDULED = os.environ.get('TOPIC_SHIPPING_SCHEDULED')

# Compensating transaction topics
TOPIC_RELEASE_INVENTORY = os.environ.get('TOPIC_RELEASE_INVENTORY')
TOPIC_REFUND_PAYMENT = os.environ.get('TOPIC_REFUND_PAYMENT')

# --- Global Pulsar Client ---
# In a Lambda context, it's a best practice to initialize clients outside the handler
# to allow for connection reuse across invocations.
try:
    auth = pulsar.AuthenticationToken(PULSAR_TOKEN)
    client = pulsar.Client(PULSAR_SERVICE_URL, authentication=auth)
    logger.info("Pulsar client initialized successfully.")
except Exception as e:
    logger.error(f"Failed to initialize Pulsar client: {e}")
    client = None

def handler(event: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
    if not client:
        return {'statusCode': 500, 'body': json.dumps({'error': 'Pulsar client not available'})}

    try:
        order_details = json.loads(event['body'])
        order_id = order_details.get('order_id', str(uuid.uuid4()))
        logger.info(f"Starting Saga for order_id: {order_id}")
    except (json.JSONDecodeError, KeyError) as e:
        logger.error(f"Invalid input payload: {e}")
        return {'statusCode': 400, 'body': json.dumps({'error': 'Invalid request body'})}

    transaction = None
    completed_steps = []

    try:
        # Step 0: Begin the Pulsar Transaction
        # The timeout should be longer than the expected total execution time of the Saga
        # but shorter than the maximum Pulsar broker transaction timeout limit.
        transaction = client.begin_transaction(300) # 5-minute timeout
        logger.info(f"Pulsar transaction started with ID: {transaction.txn_id_str()}")
        
        # --- Define Producers and Consumers for the Orchestrator ---
        # It's critical that producers used within the Saga are created with the transaction.
        producer_inventory = client.create_producer(TOPIC_RESERVE_INVENTORY)
        producer_payment = client.create_producer(TOPIC_PROCESS_PAYMENT)
        producer_shipping = client.create_producer(TOPIC_SCHEDULE_SHIPPING)

        # Consumers to wait for replies. The timeout here acts as a step-level timeout.
        consumer_inventory = client.subscribe(TOPIC_INVENTORY_RESERVED, subscription_name=f'orchestrator-inventory-{order_id}')
        consumer_payment = client.subscribe(TOPIC_PAYMENT_PROCESSED, subscription_name=f'orchestrator-payment-{order_id}')
        consumer_shipping = client.subscribe(TOPIC_SHIPPING_SCHEDULED, subscription_name=f'orchestrator-shipping-{order_id}')
        
        # === SAGA STEP 1: Reserve Inventory ===
        saga_message = {'order_id': order_id, 'item_id': order_details['item_id'], 'quantity': order_details['quantity']}
        producer_inventory.send(json.dumps(saga_message).encode('utf-8'), txn=transaction)
        logger.info(f"[{order_id}] Dispatched 'reserve-inventory' command within transaction.")
        
        # Wait for the confirmation message
        msg_inventory = consumer_inventory.receive(timeout_millis=30000) # 30s timeout
        consumer_inventory.acknowledge(msg_inventory)
        inventory_result = json.loads(msg_inventory.data().decode('utf-8'))
        
        if not inventory_result.get('success'):
            raise ValueError(f"Inventory reservation failed: {inventory_result.get('reason')}")
        
        logger.info(f"[{order_id}] Step 1 'inventory-reserved' confirmed.")
        completed_steps.append('INVENTORY')
        
        # === SAGA STEP 2: Process Payment ===
        saga_message['amount'] = order_details['amount']
        producer_payment.send(json.dumps(saga_message).encode('utf-8'), txn=transaction)
        logger.info(f"[{order_id}] Dispatched 'process-payment' command within transaction.")
        
        msg_payment = consumer_payment.receive(timeout_millis=30000) # 30s timeout
        consumer_payment.acknowledge(msg_payment)
        payment_result = json.loads(msg_payment.data().decode('utf-8'))
        
        if not payment_result.get('success'):
            raise ValueError(f"Payment processing failed: {payment_result.get('reason')}")
        
        logger.info(f"[{order_id}] Step 2 'payment-processed' confirmed.")
        completed_steps.append('PAYMENT')

        # === SAGA STEP 3: Schedule Shipping ===
        saga_message['address'] = order_details['address']
        producer_shipping.send(json.dumps(saga_message).encode('utf-8'), txn=transaction)
        logger.info(f"[{order_id}] Dispatched 'schedule-shipping' command within transaction.")

        msg_shipping = consumer_shipping.receive(timeout_millis=30000) # 30s timeout
        consumer_shipping.acknowledge(msg_shipping)
        shipping_result = json.loads(msg_shipping.data().decode('utf-8'))

        if not shipping_result.get('success'):
            raise ValueError(f"Shipping scheduling failed: {shipping_result.get('reason')}")

        logger.info(f"[{order_id}] Step 3 'shipping-scheduled' confirmed.")
        completed_steps.append('SHIPPING')

        # === COMMIT PHASE ===
        # If all steps succeeded, commit the transaction. All messages become visible.
        transaction.commit()
        logger.info(f"[{order_id}] Saga completed successfully. Transaction committed.")
        
        return {'statusCode': 200, 'body': json.dumps({'order_id': order_id, 'status': 'SUCCESS'})}

    except Exception as e:
        logger.error(f"[{order_id}] Saga failed at step '{completed_steps[-1] if completed_steps else 'START'}': {e}")
        
        if transaction:
            try:
                # === ABORT & COMPENSATE PHASE ===
                logger.warning(f"[{order_id}] Aborting Pulsar transaction.")
                transaction.abort()
                
                # Trigger compensating transactions for steps that completed successfully.
                # Note: These are sent in new, NON-TRANSACTIONAL messages.
                logger.warning(f"[{order_id}] Initiating compensation for steps: {completed_steps}")
                if 'PAYMENT' in completed_steps:
                    producer_refund = client.create_producer(TOPIC_REFUND_PAYMENT)
                    producer_refund.send(json.dumps({'order_id': order_id, 'reason': 'Saga failed'}).encode('utf-8'))
                    producer_refund.close()
                if 'INVENTORY' in completed_steps:
                    producer_release_inv = client.create_producer(TOPIC_RELEASE_INVENTORY)
                    producer_release_inv.send(json.dumps({'order_id': order_id, 'reason': 'Saga failed'}).encode('utf-8'))
                    producer_release_inv.close()

            except Exception as abort_exc:
                # A common pitfall is failing to handle errors during the abort/compensation phase.
                # This could leave the system in an unrecoverable state.
                logger.critical(f"[{order_id}] CRITICAL ERROR: Failed to abort transaction or compensate: {abort_exc}")
        
        return {'statusCode': 500, 'body': json.dumps({'order_id': order_id, 'status': 'FAILED', 'reason': str(e)})}
    finally:
        # Clean up producers and consumers
        if 'producer_inventory' in locals(): producer_inventory.close()
        if 'producer_payment' in locals(): producer_payment.close()
        if 'producer_shipping' in locals(): producer_shipping.close()
        if 'consumer_inventory' in locals(): consumer_inventory.close()
        if 'consumer_payment' in locals(): consumer_payment.close()
        if 'consumer_shipping' in locals(): consumer_shipping.close()</pre>

One major challenge we faced was the synchronous `receive` calls within the orchestrator. This transforms an asynchronous message flow into a blocking, synchronous Lambda execution, which can be expensive and is susceptible to Lambda's maximum 15-minute timeout. For longer-running Sagas, this pattern is not viable. A more advanced implementation would involve splitting the orchestrator itself into multiple Lambdas, where each function handles one step and then passes state via a dedicated state-tracking topic or a database like DynamoDB. However, for workflows that complete within a few minutes, this single-function orchestrator provides simplicity.

### The Participant Lambda: Local Transactions and Idempotency

A participant service is responsible for three things: consuming a command message, executing its local transaction, and producing a result message back to the orchestrator within the context of the ongoing Pulsar transaction. The pitfall here is ensuring the local database operation is truly atomic and that the entire process is idempotent. AWS Lambda can, under certain failure conditions, invoke the same function with the same event payload multiple times.

Below is the code for the Inventory Service participant. It interacts with a DynamoDB table and must ensure it doesn't reserve the same inventory twice for a single order.

```python
# inventory_service_lambda/handler.py
import pulsar
import os
import json
import logging
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# --- Configuration ---
PULSAR_SERVICE_URL = os.environ.get('PULSAR_SERVICE_URL')
PULSAR_TOKEN = os.environ.get('PULSAR_TOKEN')
TOPIC_RESERVE_INVENTORY = os.environ.get('TOPIC_RESERVE_INVENTORY')
TOPIC_INVENTORY_RESERVED = os.environ.get('TOPIC_INVENTORY_RESERVED')
DYNAMODB_TABLE_NAME = os.environ.get('DYNAMODB_TABLE_NAME')
SUBSCRIPTION_NAME = "inventory-service-subscription"

# --- Global Clients for Connection Reuse ---
try:
    auth = pulsar.AuthenticationToken(PULSAR_TOKEN)
    client = pulsar.Client(PULSAR_SERVICE_URL, authentication=auth)
    # The consumer is long-lived across invocations
    consumer = client.subscribe(TOPIC_RESERVE_INVENTORY, subscription_name=SUBSCRIPTION_NAME)
    # The producer will be created on-demand as it's transactional
    logger.info("Pulsar client and consumer initialized successfully.")
except Exception as e:
    logger.error(f"Failed to initialize Pulsar client: {e}")
    client = None

dynamodb = boto3.resource('dynamodb')
inventory_table = dynamodb.Table(DYNAMODB_TABLE_NAME)

def handler(event: Dict[str, Any], context: Dict[str, Any]) -> None:
    """
    This Lambda is triggered by an event source mapping (e.g., SQS)
    or runs on a schedule to poll the Pulsar topic. For simplicity,
    this example shows a manual poll loop. In production, use a proper trigger.
    """
    if not client:
        logger.error("Pulsar client not available. Exiting.")
        return

    producer = None
    try:
        # Poll for a message from the command topic.
        msg = consumer.receive(timeout_millis=5000) # Poll for 5 seconds
        if not msg:
            logger.info("No message received.")
            return
            
        payload = json.loads(msg.data().decode('utf-8'))
        order_id = payload['order_id']
        item_id = payload['item_id']
        quantity_to_reserve = payload['quantity']

        # A critical detail: The transaction ID is passed in the message properties.
        # Pulsar client libraries unfortunately don't have a standardized way to get this.
        # This often requires custom message property handling. Assuming it's in a property 'txnId'.
        # For this example, let's assume we need to reconstruct the TxnID object.
        # In a real Java/Go client, you can pass the transaction coordinator and TxnID bits directly.
        # The Python client's transaction handling is less mature, this is a conceptual placeholder.
        # txn = get_transaction_from_message_properties(msg.properties())
        # For demonstration, we'll create a new producer. The real key is that the Pulsar broker
        # manages associating the ACK and the new produce with the same transaction.
        producer = client.create_producer(TOPIC_INVENTORY_RESERVED)
        
        logger.info(f"Processing inventory reservation for order_id: {order_id}")

        # === LOCAL TRANSACTION: Idempotent write to DynamoDB ===
        # Use a conditional update to ensure we only decrement stock if it's sufficient
        # and this specific order hasn't already been processed.
        inventory_table.update_item(
            Key={'itemId': item_id},
            UpdateExpression="SET stock = stock - :q, reservations.#orderId = :q",
            ConditionExpression="attribute_exists(stock) AND stock >= :q AND attribute_not_exists(reservations.#orderId)",
            ExpressionAttributeNames={'#orderId': order_id},
            ExpressionAttributeValues={':q': quantity_to_reserve}
        )

        logger.info(f"Successfully reserved {quantity_to_reserve} of item {item_id} for order {order_id}")
        
        # If the local transaction succeeds, produce a success message to the orchestrator.
        # This message MUST be part of the same transaction.
        response_payload = {'order_id': order_id, 'success': True}
        # producer.send(json.dumps(response_payload).encode('utf-8'), txn=txn)
        producer.send(json.dumps(response_payload).encode('utf-8')) # Conceptual send

        # Acknowledging the message is also part of the transaction.
        consumer.acknowledge(msg)
        logger.info(f"Acknowledged 'reserve-inventory' and produced 'inventory-reserved' for order {order_id}")

    except ClientError as e:
        # This handles DynamoDB specific errors, e.g., ConditionalCheckFailedException
        if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
            reason = "Insufficient stock or duplicate request."
            logger.warning(f"Inventory reservation failed for order {order_id}: {reason}")
        else:
            reason = f"DynamoDB error: {e}"
            logger.error(f"Inventory reservation failed for order {order_id}: {reason}")
        
        # Produce a failure message so the orchestrator can abort.
        response_payload = {'order_id': payload.get('order_id', 'unknown'), 'success': False, 'reason': reason}
        if producer:
            # producer.send(json.dumps(response_payload).encode('utf-8'), txn=txn)
            producer.send(json.dumps(response_payload).encode('utf-8'))
        
        # Negatively acknowledge to allow for redelivery if it was a transient error.
        consumer.negative_acknowledge(msg)

    except Exception as e:
        logger.error(f"Generic error processing inventory reservation: {e}")
        consumer.negative_acknowledge(msg) # Nack for reprocessing
    finally:
        if producer:
            producer.close()
### Testing a Distributed Transaction System A common mistake is to only unit test the individual functions. For a Saga, integration testing is non-negotiable. We found `testcontainers` invaluable. Our CI/CD pipeline spins up a Docker container for Pulsar and one for DynamoDB Local. We then run a test suite that triggers the orchestrator Lambda and asserts the final state in DynamoDB and the messages produced to output topics for both success and various failure scenarios (e.g., inventory shortage, payment gateway timeout). This provides high confidence that the complex interaction between services, the message broker, and databases behaves as expected.
# A conceptual pytest fixture using testcontainers
# from testcontainers.pulsar import PulsarContainer
# from testcontainers.dynamodb import DynamoDBContainer
# import pytest

# @pytest.fixture(scope="module")
# def pulsar_broker():
#     with PulsarContainer("apachepulsar/pulsar:2.10.2") as pulsar:
#         pulsar.get_container_host_ip()
#         pulsar.get_exposed_port(6650)
#         # Enable transactions in broker.conf
#         yield pulsar

# @pytest.fixture(scope="module")
# def dynamodb_table(pulsar_broker): # Depends on pulsar to show test flow
#     with DynamoDBContainer("amazon/dynamodb-local:latest") as dynamodb:
#         # ... create table using boto3 ...
#         yield dynamodb

# def test_full_saga_success(pulsar_broker, dynamodb_table):
#     # 1. Configure Lambda environment variables to point to test containers
#     # 2. Mock the initial HTTP trigger event
#     # 3. Invoke orchestrator_lambda.handler(event, {})
#     # 4. Invoke participant handlers in a loop to simulate consumption
#     # 5. Assert final state in DynamoDB (e.g., stock is decremented)
#     # 6. Assert no messages on compensating topics
#     pass

# def test_saga_failure_at_payment(pulsar_broker, dynamodb_table):
#     # 1. Setup mocks to make the payment service fail
#     # 2. Invoke the orchestrator and participant handlers
#     # 3. Assert stock in DynamoDB is back to original value
#     # 4. Assert a message exists on the 'release-inventory' topic
#     pass
The primary trade-off with this architecture is increased latency and complexity. Each step in the Saga involves network hops to Pulsar, adding milliseconds or more to the end-to-end processing time. This makes it unsuitable for real-time, synchronous request/response cycles. Its value shines in asynchronous, long-running business processes where reliability and data consistency are paramount. Debugging is also more involved, requiring distributed tracing and log correlation across multiple Lambda functions and the ability to inspect topic messages in Pulsar to understand where a workflow stalled or failed. Furthermore, this solution relies heavily on the specific transactional features of Apache Pulsar. Migrating to another message broker like RabbitMQ or SQS would necessitate a complete redesign of the transaction management logic, likely moving to a client-side orchestration library and a separate database to track Saga state. The tight coupling to the broker's transactional capability is a strategic choice; we gained simplicity and reliability in our implementation at the cost of infrastructure flexibility. Future work might involve abstracting the transaction logic behind an interface to mitigate this, but for our current scale and requirements, the direct implementation proved to be the most pragmatic path forward.

  TOC