The mandate was to decompose a monolithic order management system. The user-facing API layer, responsible for creating orders, was to remain in Node.js, leveraging the Express.js ecosystem our team knew well. The inventory management component, however, required a more robust, computationally efficient backend and was slated for a rewrite in Kotlin using Ktor, capitalizing on the JVM’s performance and coroutine concurrency model. The immediate problem this created was transactional integrity. An order creation is not an atomic operation anymore; it’s a distributed transaction spanning two services, two languages, and two technology stacks. A simple synchronous REST or gRPC call from Express to Ktor is a non-starter in a production environment. It introduces temporal coupling, and a failure in the Ktor service would cascade directly back to the user, creating a brittle system.
A two-phase commit (2PC) protocol was briefly considered and quickly dismissed. The coordination overhead and the tendency for locks to be held across service boundaries make it an operational nightmare, killing the very agility we sought with microservices. The clear path forward was an event-driven approach using the Saga pattern. A saga is a sequence of local transactions where each transaction updates a single service and publishes an event or message that triggers the next local transaction in the saga. If a local transaction fails, the saga executes a series of compensating transactions that undo the changes made by the preceding local transactions.
This decision immediately shifted the focus to the messaging backbone. We needed more than a simple message bus. For a Saga implementation to be truly resilient, the messaging system must provide specific guarantees. Our choice landed on Apache Pulsar. While other systems like RabbitMQ or Kafka are viable, Pulsar offered a compelling trifecta of features crucial for this pattern:
- Delayed Message Delivery: This is a killer feature for implementing timeouts. If a service fails to respond within a certain timeframe, the orchestrator can trigger a compensating action by sending a delayed message to itself.
- Dead Letter Queues (DLQ) and Retry Topics: Pulsar provides fine-grained control over message redelivery. If a consumer service repeatedly fails to process a message, Pulsar can automatically route it to a designated Dead Letter Queue after a configurable number of attempts. This prevents poison pills from blocking the main topic and provides an operational hook for inspection and manual intervention.
- Schema Registry: Integrating a Node.js service with a JVM service invites data contract mismatches. Pulsar’s built-in Schema Registry enforces a predefined schema (like Avro or JSON Schema) on topics, ensuring that producers and consumers agree on the data structure, preventing runtime serialization errors.
The architecture crystallized around this concept: The Express.js service would act as the Saga Orchestrator. Upon receiving an order request, it would initiate the Saga by persisting a PENDING
order and emitting an OrderCreationRequested
event. The Ktor inventory service, a Saga Participant, would consume this event, attempt to reserve inventory, and emit either an InventoryReserved
or InventoryReservationFailed
event. The Express.js service would listen for these outcomes to finalize the order state to CONFIRMED
or CANCELLED
.
sequenceDiagram participant Client participant Express.js (Orchestrator) participant Pulsar participant Ktor (Participant) Client->>+Express.js: POST /api/orders { productId, quantity } Express.js->>Express.js: 1. Create Order in DB (status: PENDING) Express.js->>+Pulsar: 2. Produce(topic: order-saga-topic, msg: OrderCreationRequested) Note over Express.js: Order creation initiated, returns 202 Accepted Express.js-->>-Client: HTTP 202 Accepted { orderId } Pulsar->>+Ktor: 3. Consume(topic: order-saga-topic, msg: OrderCreationRequested) Ktor->>Ktor: 4. Attempt to reserve inventory alt Inventory available Ktor->>Ktor: 4a. Decrement stock in DB Ktor->>+Pulsar: 5a. Produce(topic: inventory-saga-topic, msg: InventoryReserved) else Inventory unavailable Ktor->>+Pulsar: 5b. Produce(topic: inventory-saga-topic, msg: InventoryReservationFailed) end Pulsar-->>-Ktor: Acknowledge message Pulsar->>+Express.js: 6. Consume(topic: inventory-saga-topic) alt Message is InventoryReserved Express.js->>Express.js: 7a. Update Order in DB (status: CONFIRMED) else Message is InventoryReservationFailed Express.js->>Express.js: 7b. Update Order in DB (status: CANCELLED) (Compensating Action) end Pulsar-->>-Express.js: Acknowledge message
Setting the Foundation: Pulsar Configuration and Schemas
Before any code is written, the data contracts and messaging infrastructure must be defined. We’ll use Avro for our schemas due to its strong typing and schema evolution capabilities.
Saga Event Schema (saga-events.avsc
):
[
{
"type": "record",
"name": "OrderCreationRequested",
"namespace": "com.example.saga.events",
"fields": [
{ "name": "orderId", "type": "string" },
{ "name": "productId", "type": "string" },
{ "name": "quantity", "type": "int" }
]
},
{
"type": "record",
"name": "InventoryReserved",
"namespace": "com.example.saga.events",
"fields": [
{ "name": "orderId", "type": "string" }
]
},
{
"type": "record",
"name": "InventoryReservationFailed",
"namespace": "com.example.saga.events",
"fields": [
{ "name": "orderId", "type": "string" },
{ "name": "reason", "type": "string" }
]
}
]
We need two primary topics. Let’s assume a tenant public
and namespace default
.
-
order-saga-topic
: CarriesOrderCreationRequested
events from Express to Ktor. -
inventory-saga-topic
: CarriesInventoryReserved
andInventoryReservationFailed
events from Ktor back to Express.
To handle failures in the Ktor service, we’ll configure a Dead Letter Queue on its subscription.
Pulsar Setup (using pulsar-admin
):
# It's assumed Pulsar is running, e.g., via Docker:
# docker run -d -it -p 6650:6650 -p 8080:8080 --name pulsar apachepulsar/pulsar:latest bin/pulsar standalone
# Create the topics
pulsar-admin topics create persistent://public/default/order-saga-topic
pulsar-admin topics create persistent://public/default/inventory-saga-topic
pulsar-admin topics create persistent://public/default/inventory-consumer-dlq
# Upload the Avro schema (not strictly required by clients but good practice for governance)
# This step is often done programmatically on first client connection.
The Orchestrator: Express.js Order Service
The Express service is responsible for initiating the saga and reacting to its outcome. It maintains the state of the order. For simplicity, we’ll use an in-memory database, but in a real-world project, this would be a persistent store like PostgreSQL.
Project Setup (package.json
):
{
"name": "order-service",
"version": "1.0.0",
"main": "server.js",
"scripts": {
"start": "node server.js"
},
"dependencies": {
"pulsar-client": "^1.8.0",
"express": "^4.18.2",
"uuid": "^9.0.0",
"winston": "^3.8.2"
}
}
Configuration (config.js
):
module.exports = {
pulsar: {
serviceUrl: 'pulsar://localhost:6650',
orderTopic: 'persistent://public/default/order-saga-topic',
inventoryTopic: 'persistent://public/default/inventory-saga-topic',
inventorySubscription: 'order-service-inventory-subscription',
},
server: {
port: 3000,
},
log: {
level: 'info',
},
};
Core Logic (server.js
):
const express = require('express');
const Pulsar = require('pulsar-client');
const { v4: uuidv4 } = require('uuid');
const winston = require('winston');
const config = require('./config');
// --- Logger Setup ---
const logger = winston.createLogger({
level: config.log.level,
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [new winston.transports.Console()],
});
// --- In-memory Database Simulation ---
const ordersDB = new Map();
// --- Avro Schema Definition (must match the file) ---
const OrderCreationRequestedSchema = {
type: 'record',
name: 'OrderCreationRequested',
fields: [
{ name: 'orderId', type: 'string' },
{ name: 'productId', type: 'string' },
{ name: 'quantity', type: 'int' },
],
};
const main = async () => {
const app = express();
app.use(express.json());
// --- Pulsar Client Initialization ---
const pulsarClient = new Pulsar.Client({
serviceUrl: config.pulsar.serviceUrl,
log: (level, file, line, message) => {
// A common mistake is to ignore the client's internal logs.
// Pipe them to your application logger.
logger.log(Pulsar.LogLevel.toString(level).toLowerCase(), `PulsarClient: ${message}`);
},
});
// --- Pulsar Producer for initiating the saga ---
const orderProducer = await pulsarClient.createProducer({
topic: config.pulsar.orderTopic,
// The schema here is critical for cross-language compatibility.
// Pulsar validates that messages conform to this structure.
schema: {
encode: (data) => Buffer.from(JSON.stringify(data)),
decode: (buffer) => JSON.parse(buffer.toString()),
schemaInfo: {
name: 'OrderCreationRequested',
type: Pulsar.SchemaType.JSON,
schema: JSON.stringify(OrderCreationRequestedSchema),
},
},
sendTimeoutMs: 30000,
});
// --- API Endpoint to create an order ---
app.post('/api/orders', async (req, res) => {
const { productId, quantity } = req.body;
if (!productId || !quantity || quantity <= 0) {
return res.status(400).json({ error: 'Invalid productId or quantity' });
}
const orderId = uuidv4();
const order = {
orderId,
productId,
quantity,
status: 'PENDING',
createdAt: new Date().toISOString(),
};
try {
// 1. Persist the initial state locally.
// This is the first local transaction of the saga.
ordersDB.set(orderId, order);
logger.info(`Order ${orderId} created with status PENDING.`);
// 2. Publish the event to trigger the next step in the saga.
await orderProducer.send({
data: { orderId, productId, quantity },
});
logger.info(`Published OrderCreationRequested for order ${orderId}.`);
// It is crucial to return 202 Accepted here, not 201 Created.
// The resource creation is not complete; it's in progress.
return res.status(202).json({ orderId, status: 'PENDING' });
} catch (error) {
logger.error(`Failed to initiate order saga for ${orderId}: ${error.message}`);
// If publishing fails, we should ideally roll back the local DB insert.
// For this example, we'll just log it. A real system needs a robust outbox pattern.
ordersDB.delete(orderId);
return res.status(500).json({ error: 'Failed to process order request.' });
}
});
app.get('/api/orders/:orderId', (req, res) => {
const order = ordersDB.get(req.params.orderId);
if (order) {
return res.status(200).json(order);
}
return res.status(404).json({ error: 'Order not found' });
});
// --- Pulsar Consumer to handle saga outcomes ---
const inventoryConsumer = await pulsarClient.subscribe({
topic: config.pulsar.inventoryTopic,
subscription: config.pulsar.inventorySubscription,
subscriptionType: 'Shared', // Use 'Shared' if multiple instances of the service run
ackTimeoutMs: 10000,
});
// Asynchronous loop to process messages
(async () => {
logger.info(`Listening for inventory saga outcomes on topic ${config.pulsar.inventoryTopic}`);
while (true) {
try {
const msg = await inventoryConsumer.receive();
const payload = JSON.parse(msg.getData().toString());
const eventType = msg.getProperty('eventType'); // Custom property to identify message type
const { orderId } = payload;
logger.info(`Received message for order ${orderId} with type: ${eventType}`);
const order = ordersDB.get(orderId);
if (!order) {
logger.warn(`Received event for unknown order ${orderId}. Acknowledging to avoid redelivery.`);
await inventoryConsumer.acknowledge(msg);
continue;
}
// A key consideration: ensure we don't process an outcome for an already finalized order.
if (order.status !== 'PENDING') {
logger.warn(`Order ${orderId} is already in state ${order.status}. Ignoring event ${eventType}.`);
await inventoryConsumer.acknowledge(msg);
continue;
}
if (eventType === 'InventoryReserved') {
// Saga step succeeded. Finalize the order state.
order.status = 'CONFIRMED';
order.updatedAt = new Date().toISOString();
ordersDB.set(orderId, order);
logger.info(`Order ${orderId} confirmed.`);
} else if (eventType === 'InventoryReservationFailed') {
// Saga step failed. Execute the compensating transaction.
order.status = 'CANCELLED';
order.reason = payload.reason;
order.updatedAt = new Date().toISOString();
ordersDB.set(orderId, order);
logger.warn(`Order ${orderId} cancelled due to: ${payload.reason}`);
} else {
logger.error(`Unknown eventType: ${eventType} for order ${orderId}`);
}
await inventoryConsumer.acknowledge(msg);
} catch (error) {
logger.error(`Error processing inventory message: ${error.message}`);
// In a real system, the unacknowledged message would be redelivered.
// After enough failures, Pulsar would send it to the DLQ if configured.
}
}
})();
const server = app.listen(config.server.port, () => {
logger.info(`Order service listening on port ${config.server.port}`);
});
// Graceful shutdown
process.on('SIGINT', async () => {
logger.info('Shutting down gracefully...');
await orderProducer.close();
await inventoryConsumer.close();
await pulsarClient.close();
server.close();
});
};
main().catch(error => logger.error(`Unhandled exception in main: ${error.message}`));
The Participant: Ktor Inventory Service
The Ktor service is a background worker. It doesn’t expose any HTTP endpoints for this use case; its sole responsibility is to consume messages from Pulsar, execute its local transaction, and publish the result.
Project Setup (build.gradle.kts
):
plugins {
kotlin("jvm") version "1.8.21"
application
id("com.github.johnrengelman.shadow") version "7.1.2" // For creating a fat JAR
}
repositories {
mavenCentral()
}
dependencies {
implementation("io.ktor:ktor-server-core-jvm:2.3.2")
implementation("io.ktor:ktor-server-netty-jvm:2.3.2")
implementation("ch.qos.logback:logback-classic:1.4.7")
// Pulsar client and Avro dependencies
implementation("org.apache.pulsar:pulsar-client:2.11.1")
implementation("org.apache.pulsar:pulsar-client-admin:2.11.1")
implementation("org.apache.avro:avro:1.11.1")
// Kotlin coroutines for structured concurrency
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
}
application {
mainClass.set("com.example.inventory.ApplicationKt")
}
Core Logic (Application.kt
):
package com.example.inventory
import kotlinx.coroutines.*
import org.apache.pulsar.client.api.*
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
object Config {
const val PULSAR_SERVICE_URL = "pulsar://localhost:6650"
const val ORDER_TOPIC = "persistent://public/default/order-saga-topic"
const val INVENTORY_TOPIC = "persistent://public/default/inventory-saga-topic"
const val SUBSCRIPTION_NAME = "inventory-service-subscription"
const val DLQ_TOPIC = "persistent://public/default/inventory-consumer-dlq"
}
// --- In-memory Inventory Database Simulation ---
val inventoryDB = ConcurrentHashMap<String, Int>().apply {
put("product-123", 10) // Start with 10 items in stock
}
fun main(): Unit = runBlocking {
val logger = LoggerFactory.getLogger("InventoryService")
logger.info("Starting Inventory Service...")
val pulsarClient = PulsarClient.builder()
.serviceUrl(Config.PULSAR_SERVICE_URL)
.build()
val inventoryProducer = pulsarClient.newProducer(Schema.JSON(InventoryOutcome::class.java))
.topic(Config.INVENTORY_TOPIC)
.create()
// The DeadLetterPolicy is a critical piece of the resilience strategy.
// If we fail to process a message 5 times, it gets sent to the DLQ.
val deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(5)
.deadLetterTopic(Config.DLQ_TOPIC)
.build()
val orderConsumer = pulsarClient.newConsumer(Schema.JSON(OrderCreationRequested::class.java))
.topic(Config.ORDER_TOPIC)
.subscriptionName(Config.SUBSCRIPTION_NAME)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(10, TimeUnit.SECONDS)
.deadLetterPolicy(deadLetterPolicy)
.subscribe()
logger.info("Pulsar consumer and producer initialized. Listening for orders...")
// Launch a long-running coroutine for message consumption.
// This is the idiomatic way to run background tasks in a Ktor/Kotlin application.
val job = CoroutineScope(Dispatchers.IO).launch {
while (isActive) {
try {
val msg = orderConsumer.receive()
val orderRequest = msg.value
logger.info("Received OrderCreationRequested for orderId: ${orderRequest.orderId}")
try {
// This is the core local transaction for the inventory service.
val currentStock = inventoryDB.compute(orderRequest.productId) { _, stock ->
if (stock == null || stock < orderRequest.quantity) {
// The pitfall here is not handling the 'null' case, assuming the product exists.
throw InsufficientStockException("Not enough stock for ${orderRequest.productId}. Required: ${orderRequest.quantity}, Available: ${stock ?: 0}")
}
stock - orderRequest.quantity
}
logger.info("Inventory reserved for ${orderRequest.productId}. New stock: $currentStock")
// Publish success outcome
val successOutcome = InventoryOutcome(orderRequest.orderId)
inventoryProducer.newMessage()
.value(successOutcome)
.property("eventType", "InventoryReserved")
.send()
orderConsumer.acknowledge(msg)
logger.info("Acknowledged message and sent InventoryReserved for orderId: ${orderRequest.orderId}")
} catch (e: InsufficientStockException) {
// Publish failure outcome (the compensating trigger)
logger.warn("Inventory reservation failed for orderId ${orderRequest.orderId}: ${e.message}")
val failureOutcome = InventoryOutcome(orderRequest.orderId, e.message)
inventoryProducer.newMessage()
.value(failureOutcome)
.property("eventType", "InventoryReservationFailed")
.send()
// Important: We still acknowledge the message. The business process failed,
// but the message processing itself was successful. Not acknowledging
// would cause redelivery, which is incorrect.
orderConsumer.acknowledge(msg)
} catch (e: Exception) {
// This catches unexpected system errors, not business logic failures.
// Here, we do NOT acknowledge the message. This will trigger a redelivery.
// After enough redeliveries, the DLQ policy will kick in.
logger.error("Unexpected error processing order ${orderRequest.orderId}, will trigger redelivery.", e)
orderConsumer.negativeAcknowledge(msg)
}
} catch (e: PulsarClientException) {
if (!isActive) break // Exit loop on shutdown
logger.error("Pulsar client exception: ${e.message}", e)
// A backoff delay is crucial to prevent tight-looping on persistent connection errors.
delay(5000)
}
}
}
// Graceful shutdown hook
Runtime.getRuntime().addShutdownHook(Thread {
runBlocking {
logger.info("Shutting down inventory service...")
job.cancelAndJoin()
orderConsumer.close()
inventoryProducer.close()
pulsarClient.close()
logger.info("Shutdown complete.")
}
})
}
// Data classes for JSON serialization by Pulsar client
data class OrderCreationRequested(val orderId: String = "", val productId: String = "", val quantity: Int = 0)
data class InventoryOutcome(val orderId: String = "", val reason: String? = null)
class InsufficientStockException(message: String) : RuntimeException(message)
Limitations and Future Trajectory
This implementation provides a resilient foundation for cross-stack transactions, but it’s not a silver bullet. The primary lingering issue is observability. While structured logs help, tracing a single order’s journey across Express.js, Pulsar, and Ktor is difficult. The next logical step would be to introduce distributed tracing using OpenTelemetry, propagating context from the initial API call through Pulsar message headers to the final database update. This would provide a unified view of the entire transaction flow, dramatically reducing mean-time-to-resolution for failures.
Furthermore, the saga’s atomicity relies on the successful execution of compensating transactions. If the Express.js service fails to process the InventoryReservationFailed
message (perhaps due to a bug or its own database being down), the order would remain PENDING
while the inventory service believes the transaction is complete. This state inconsistency requires a reconciliation mechanism, such as a periodic background job that queries for stale PENDING
orders and re-evaluates their status, or more sophisticated alerting based on messages piling up in the Dead Letter Queue. The saga pattern trades the strict consistency of 2PC for higher availability and performance, but that trade-off manifests as increased operational complexity and the need for robust monitoring and recovery tooling.