The project mandate was rigid: integrate our new Koa-based microservices with a third-party financial ledger system. This external system offered no support for compensating transactions or idempotent message processing. Any multi-service operation that touched it had to be atomic—either all changes committed successfully across all services and the ledger, or none at all. This immediately ruled out eventually consistent patterns like Saga. We were backed into a corner, forced to implement a Two-Phase Commit (2PC) protocol, a pattern known for its fragility and performance bottlenecks.
Our initial, naive approach was to embed the 2PC participant logic directly within each microservice. This quickly devolved into a maintenance nightmare. The coordination logic was duplicated, configuration for transaction timeouts and participant endpoints was scattered, and every change required a coordinated, multi-repository deployment. After a particularly painful rollback, we decided to centralize the coordination logic into a dedicated service. This decision was the start of a complex but ultimately necessary journey to build, deploy, and manage a custom 2PC Transaction Coordinator in a way that was both robust and automated.
The architecture crystallized around a few key choices. The Coordinator itself would be a Koa application, leveraging our team’s existing Node.js expertise. All inter-service communication would be mediated by Envoy Proxy sidecars, giving us centralized control over routing, retries, and observability. Finally, the entire stack—from the coordinator service to the participant microservices and their Envoy configurations—would be declaratively managed by Puppet and deployed via a Jenkins pipeline. This was not about building the perfect 2PC protocol; it was about taming its operational complexity in a real-world environment.
The Anatomy of the Koa-Based Transaction Coordinator
The coordinator is the brain of the 2PC operation. It doesn’t perform any business logic itself; its sole responsibility is to orchestrate the two phases of the transaction across all registered participants. Its state machine must be meticulously managed.
A real-world project would require a persistent, write-ahead log (WAL) to survive coordinator crashes. For this implementation, we’ll simulate that with a file-based log and an in-memory state map to keep the focus on the protocol flow and its integration with the surrounding tooling.
Here is the core structure of our coordinator-service
:
// coordinator-service/src/app.js
const Koa = require('koa');
const Router = require('@koa/router');
const bodyParser = require('koa-bodyparser');
const { v4: uuidv4 } = require('uuid');
const fs = require('fs').promises;
const axios = require('axios');
const app = new Koa();
const router = new Router();
// WARNING: In-memory store is for demonstration only.
// A production system requires a persistent, crash-consistent data store (e.g., a database with WAL, or RocksDB).
const transactions = new Map();
const TRANSACTION_LOG_PATH = './transaction.log';
const TRANSACTION_TIMEOUT_MS = 10000; // 10 seconds
async function logTransactionState(txId, state, details) {
const logEntry = `${new Date().toISOString()} | ${txId} | ${state} | ${JSON.stringify(details)}\n`;
try {
await fs.appendFile(TRANSACTION_LOG_PATH, logEntry);
} catch (err) {
console.error('FATAL: Could not write to transaction log!', err);
// In a real system, this might trigger a halt to prevent inconsistent state.
}
}
// Transaction State Machine
const TxState = {
INITIAL: 'INITIAL',
PREPARING: 'PREPARING',
PREPARED: 'PREPARED',
COMMITTING: 'COMMITTING',
ABORTING: 'ABORTING',
COMMITTED: 'COMMITTED',
ABORTED: 'ABORTED',
};
router.post('/transactions', (ctx) => {
const txId = uuidv4();
const newTx = {
id: txId,
state: TxState.INITIAL,
participants: [],
startTime: Date.now(),
votes: { commit: 0, abort: 0 },
};
transactions.set(txId, newTx);
logTransactionState(txId, TxState.INITIAL, { message: 'Transaction created' });
console.log(`[${txId}] Transaction created.`);
ctx.status = 201;
ctx.body = { transactionId: txId };
});
router.post('/transactions/:id/register', (ctx) => {
const { id } = ctx.params;
const { participantUrl } = ctx.request.body;
const tx = transactions.get(id);
if (!tx || tx.state !== TxState.INITIAL) {
ctx.throw(404, 'Transaction not found or not in initial state.');
return;
}
if (!participantUrl) {
ctx.throw(400, 'participantUrl is required.');
return;
}
tx.participants.push(participantUrl);
console.log(`[${id}] Participant registered: ${participantUrl}`);
ctx.status = 200;
ctx.body = { message: 'Participant registered.' };
});
async function beginPreparePhase(txId) {
const tx = transactions.get(txId);
if (!tx || tx.state !== TxState.INITIAL) {
console.error(`[${txId}] Attempted to prepare transaction in invalid state: ${tx ? tx.state : 'Not Found'}`);
return;
}
tx.state = TxState.PREPARING;
await logTransactionState(txId, TxState.PREPARING, { participants: tx.participants });
console.log(`[${txId}] Starting PREPARE phase for ${tx.participants.length} participants.`);
const preparePromises = tx.participants.map(url =>
axios.post(`${url}/prepare`, { transactionId: txId }, { timeout: TRANSACTION_TIMEOUT_MS / 2 })
.then(response => ({ url, status: 'VOTE_COMMIT' }))
.catch(error => {
console.error(`[${txId}] Participant ${url} failed to prepare:`, error.message);
return { url, status: 'VOTE_ABORT' };
})
);
const results = await Promise.all(preparePromises);
results.forEach(result => {
if (result.status === 'VOTE_COMMIT') {
tx.votes.commit++;
} else {
tx.votes.abort++;
}
});
if (tx.votes.abort > 0) {
console.log(`[${txId}] Received ABORT vote. Transitioning to ABORTING.`);
tx.state = TxState.ABORTING;
await logTransactionState(txId, TxState.ABORTING, { reason: 'Participant voted to abort or failed.' });
beginGlobalDecisionPhase(txId, 'abort');
} else if (tx.votes.commit === tx.participants.length) {
console.log(`[${txId}] All participants voted COMMIT. Transitioning to COMMITTING.`);
tx.state = TxState.PREPARED; // Log prepared state before starting commit
await logTransactionState(txId, TxState.PREPARED, { reason: 'All participants voted to commit.'});
tx.state = TxState.COMMITTING;
await logTransactionState(txId, TxState.COMMITTING, { reason: 'Proceeding with global commit.'});
beginGlobalDecisionPhase(txId, 'commit');
} else {
// This case should not happen with the current logic but is a safeguard.
console.error(`[${txId}] Inconsistent vote count. Aborting.`);
tx.state = TxState.ABORTING;
await logTransactionState(txId, TxState.ABORTING, { reason: 'Inconsistent vote count.' });
beginGlobalDecisionPhase(txId, 'abort');
}
}
async function beginGlobalDecisionPhase(txId, decision) {
const tx = transactions.get(txId);
if (!tx) return;
console.log(`[${txId}] Broadcasting global decision: ${decision.toUpperCase()}`);
const decisionPromises = tx.participants.map(url =>
axios.post(`${url}/${decision}`, { transactionId: txId })
.catch(err => {
// A real system needs a robust retry mechanism or manual intervention process.
console.error(`[${txId}] CRITICAL: Participant ${url} failed to acknowledge ${decision}:`, err.message);
})
);
await Promise.all(decisionPromises);
const finalState = decision === 'commit' ? TxState.COMMITTED : TxState.ABORTED;
tx.state = finalState;
await logTransactionState(txId, finalState, { message: `Global decision ${decision} completed.` });
console.log(`[${txId}] Transaction finished with state: ${finalState}`);
// Clean up old transactions. In production, this would be a more sophisticated garbage collection process.
setTimeout(() => transactions.delete(txId), 60000);
}
router.post('/transactions/:id/execute', (ctx) => {
const { id } = ctx.params;
const tx = transactions.get(id);
if (!tx || tx.state !== TxState.INITIAL) {
ctx.throw(409, 'Transaction already started or does not exist.');
return;
}
// Non-blocking response to the client
ctx.status = 202;
ctx.body = { message: 'Transaction processing started.' };
// Start the 2PC flow asynchronously
beginPreparePhase(id);
});
app.use(bodyParser());
app.use(router.routes()).use(router.allowedMethods());
const PORT = 3000;
app.listen(PORT, () => {
console.log(`Transaction Coordinator listening on port ${PORT}`);
});
The critical pitfall here is timeout and failure handling. If a participant is unreachable during the PREPARE phase, the entire transaction must be aborted. If a participant fails to acknowledge the final COMMIT or ABORT decision, the coordinator has fulfilled its duty by logging the decision. The onus is then on the participant to recover and query the coordinator for the transaction’s outcome upon restart. This recovery logic is non-trivial and is a major reason why 2PC is so complex in practice.
Instrumenting a Participant Service
The participant services—for example, an inventory-service
and an orders-service
—must expose an API that the coordinator can call. They must be able to hold resources (like a database lock or a pending record) in a “prepared” state without finalizing them until the coordinator issues the final command.
Here’s a simplified inventory-service
:
// inventory-service/src/service.js
const Koa = require('koa');
const Router = require('@koa/router');
const bodyParser = require('koa-bodyparser');
const app = new Koa();
const router = new Router();
// In-memory representation of inventory and pending transactions
const inventory = { 'item-123': 100 };
const pendingTransactions = new Map();
// Participant API for the Coordinator
router.post('/prepare', (ctx) => {
const { transactionId } = ctx.request.body;
console.log(`[${transactionId}] Received PREPARE request.`);
// Business logic: check if we can reserve the inventory
const requiredStock = 10; // Hardcoded for example
if (inventory['item-123'] >= requiredStock) {
pendingTransactions.set(transactionId, {
item: 'item-123',
quantity: requiredStock,
});
console.log(`[${transactionId}] VOTE_COMMIT: Resources prepared and locked.`);
ctx.status = 200;
ctx.body = { vote: 'COMMIT' };
} else {
console.log(`[${transactionId}] VOTE_ABORT: Insufficient stock.`);
ctx.status = 409; // Conflict
ctx.body = { vote: 'ABORT', reason: 'Insufficient stock' };
}
});
router.post('/commit', (ctx) => {
const { transactionId } = ctx.request.body;
const pending = pendingTransactions.get(transactionId);
if (!pending) {
// This can happen on retries. It must be idempotent.
console.log(`[${transactionId}] COMMIT request for an unknown or already processed transaction.`);
ctx.status = 200;
return;
}
console.log(`[${transactionId}] Received COMMIT request.`);
inventory[pending.item] -= pending.quantity;
pendingTransactions.delete(transactionId);
console.log(`[${transactionId}] Transaction committed. New stock for ${pending.item}: ${inventory[pending.item]}`);
ctx.status = 200;
});
router.post('/abort', (ctx) => {
const { transactionId } = ctx.request.body;
console.log(`[${transactionId}] Received ABORT request.`);
// Release any locked resources
if (pendingTransactions.has(transactionId)) {
pendingTransactions.delete(transactionId);
console.log(`[${transactionId}] Aborted. Released locked resources.`);
}
ctx.status = 200;
});
app.use(bodyParser());
app.use(router.routes()).use(router.allowedMethods());
const PORT = 4001;
app.listen(PORT, () => {
console.log(`Inventory Service listening on port ${PORT}`);
});
Mediating Communication with Envoy Proxy
Hard-coding the coordinator’s address in each participant service is a recipe for disaster. This is where Envoy comes in. We deploy an Envoy sidecar with each service. The application code makes requests to localhost
, and Envoy handles service discovery, routing, and retries.
The Envoy configuration defines clusters for each upstream service. The key is that the inventory-service
application code will make a request to http://localhost:9000/coordinator-api/...
and its sidecar Envoy will translate that to the actual coordinator service address.
A snippet from the inventory-service
‘s envoy.yaml
:
# envoy.yaml for inventory-service
static_resources:
listeners:
- name: local_service_listener
address:
socket_address: { address: 0.0.0.0, port_value: 8000 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route: { cluster: inventory_service_cluster } # Route traffic to the local Koa app
http_filters:
- name: envoy.filters.http.router
clusters:
- name: inventory_service_cluster
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: inventory_service_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: { address: 127.0.0.1, port_value: 4001 } # App port
# This is the crucial part for service-to-coordinator communication
- name: coordinator_service_cluster
connect_timeout: 1s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: coordinator_service_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
# THIS ADDRESS IS MANAGED BY PUPPET
socket_address: { address: "coordinator.service.consul", port_value: 3000 }
The application code would need to be adapted to make its outbound calls through Envoy. For example, instead of calling another service directly, it registers itself with the coordinator by posting to its own local Envoy proxy, which then routes the request correctly. This abstracts the network topology from the developer.
graph TD A[Client] -->|/initiate_tx| B(API Gateway + Envoy) B --> C{Coordinator Service} C -->|1. Register| D(Inventory Svc + Envoy) C -->|1. Register| E(Order Svc + Envoy) subgraph Transaction Phase 1 Prepare C -->|2. Prepare| D C -->|2. Prepare| E D -->|3. Vote Commit| C E -->|3. Vote Commit| C end subgraph Transaction Phase 2 Commit C -->|4. Global Commit| D C -->|4. Global Commit| E end
Declarative Infrastructure with Puppet
Managing these Envoy configurations, service deployments, and their interdependencies manually is not scalable. We use Puppet to declare the desired state of each node.
A simple Puppet module for our coordinator service would look like this:
puppet/modules/tx_coordinator/manifests/init.pp
# Class: tx_coordinator
# Manages the deployment of the 2PC transaction coordinator service.
class tx_coordinator (
String $service_user = 'txcoord',
String $install_dir = '/opt/tx-coordinator',
String $git_source = 'https://internal.git/tx-coordinator.git',
String $node_version = '18.x',
) {
# Ensure Node.js is installed
include nodejs
user { $service_user:
ensure => present,
system => true,
shell => '/bin/false',
home => $install_dir,
}
vcsrepo { $install_dir:
ensure => present,
provider => 'git',
source => $git_source,
revision => 'main',
user => $service_user,
require => User[$service_user],
}
exec { 'npm_install_coordinator':
command => 'npm install --production',
cwd => $install_dir,
user => $service_user,
path => ['/bin', '/usr/bin'],
refreshonly => true,
subscribe => Vcsrepo[$install_dir],
}
file { '/etc/systemd/system/tx-coordinator.service':
ensure => file,
owner => 'root',
group => 'root',
mode => '0644',
content => template('tx_coordinator/tx-coordinator.service.erb'),
notify => Service['tx-coordinator'],
}
service { 'tx-coordinator':
ensure => running,
enable => true,
require => [
Exec['npm_install_coordinator'],
File['/etc/systemd/system/tx-coordinator.service'],
],
}
}
The real power comes from managing the participant services’ Envoy configurations. We can use a template for envoy.yaml
and inject the coordinator’s address using Hiera, Puppet’s key-value data lookup system.
puppet/data/common.yaml
---
tx_coordinator::host: '10.0.1.50' # IP of the coordinator node
tx_coordinator::port: 3000
puppet/modules/participant_service/templates/envoy.yaml.erb
# ... (rest of envoy config) ...
- name: coordinator_service_cluster
connect_timeout: 1s
type: STATIC # Using static for simplicity
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: coordinator_service_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: { address: "<%= @tx_coordinator_host %>", port_value: <%= @tx_coordinator_port %> }
# ... (rest of envoy config) ...
Now, if the coordinator’s IP changes, we only need to update common.yaml
, and the next Puppet run will propagate that change to all participant Envoy sidecars and trigger a hot reload, all without any application code changes or manual intervention.
Orchestrating with a Jenkins Pipeline
The final piece is the Jenkins pipeline that ties everything together. It builds the applications, archives the artifacts, and then triggers Puppet runs on the appropriate nodes in the correct sequence.
Jenkinsfile
pipeline {
agent any
environment {
// Use Jenkins credentials for repository access
GIT_CREDENTIALS = credentials('git-ssh-credentials')
}
stages {
stage('Build & Test All Services') {
parallel {
stage('Build Coordinator') {
steps {
dir('coordinator-service') {
git url: 'ssh://[email protected]/tx-coordinator.git', credentialsId: 'git-ssh-credentials'
sh 'npm ci'
sh 'npm test'
// In a real pipeline, we'd publish to an artifact repository
tar czf: '../coordinator-service.tar.gz', dir: '.', includes: 'src/,package.json,package-lock.json'
}
}
}
stage('Build Inventory Service') {
steps {
dir('inventory-service') {
git url: 'ssh://[email protected]/inventory-service.git', credentialsId: 'git-ssh-credentials'
sh 'npm ci'
sh 'npm test'
tar czf: '../inventory-service.tar.gz', dir: '.', includes: 'src/,package.json,package-lock.json'
}
}
}
}
}
stage('Archive Artifacts') {
steps {
archiveArtifacts artifacts: '*.tar.gz', fingerprint: true
}
}
stage('Deploy Coordinator') {
steps {
echo "Deploying Transaction Coordinator..."
// This assumes the Jenkins agent can SSH to the target and run puppet
// In a production setup, you might use the Puppet Orchestrator or another trigger mechanism
sshagent(credentials: ['puppet-deploy-key']) {
sh 'ssh [email protected] "sudo puppet agent -t"'
}
// Basic health check after deployment
script {
sleep(5) // Give service time to start
def response = httpRequest url: 'http://coordinator-node.example.com:3000/health', quiet: true
if (response.status != 200) {
error "Coordinator service health check failed after deployment."
}
}
}
}
stage('Deploy Participant Services') {
// These can be deployed in parallel as they depend on the coordinator, not each other
parallel {
stage('Deploy Inventory Service') {
steps {
echo "Deploying Inventory Service..."
sshagent(credentials: ['puppet-deploy-key']) {
sh 'ssh [email protected] "sudo puppet agent -t"'
}
}
}
stage('Deploy Order Service') {
steps {
echo "Deploying Order Service..."
sshagent(credentials: ['puppet-deploy-key']) {
sh 'ssh [email protected] "sudo puppet agent -t"'
}
}
}
}
}
}
}
A common mistake in this setup is overlooking the atomicity of the deployment itself. If the ‘Deploy Coordinator’ stage succeeds but one of the participant deployments fails, the system is left in a mixed state where some services have the new configuration and others don’t. A more advanced pipeline would incorporate blue-green deployment strategies, using Puppet to stand up a new stack and Jenkins to orchestrate a traffic switch-over in Envoy once all components are verified.
The synchronous, blocking nature of this 2PC architecture remains its greatest weakness. The entire transaction is held hostage by the slowest participant, and the coordinator is a glaring single point of failure. While our Puppet and Jenkins automation makes the system manageable and reproducible, it doesn’t fix these fundamental architectural flaws. A production-grade coordinator would require a clustered, high-availability configuration using a consensus protocol like Raft to manage its stateful transaction log, adding another significant layer of complexity. This entire solution should be viewed as a carefully engineered workaround for a rigid external constraint, not as a desirable pattern for greenfield development where alternatives like Saga, with their emphasis on choreography and compensation, offer a more resilient and scalable path.