Implementing a Two-Phase Commit Protocol for Atomic Resource Provisioning Across a Kubernetes Cluster and an API Gateway


The task was to guarantee atomicity for new service onboarding within our internal developer platform (IDP). A single developer request to provision a “preview environment” triggers three distinct, remote operations: creating a metadata record in a PostgreSQL database, deploying a set of manifests to a Kubernetes cluster, and configuring a new route in our central API Gateway. Any partial failure here results in a corrupted state, requiring manual intervention—a developer might get a database entry but no running pods, or running pods that are unreachable via the gateway. Our initial scripted approach with try/catch blocks for cleanup was brittle and insufficient for production.

This led us down the path of distributed transactions. We evaluated the Saga pattern but discarded it for this specific use case. Sagas, with their compensating transactions, offer eventual consistency. For environment provisioning, we required strong, immediate consistency. A developer should not be told an environment is “being created” only for it to fail and be rolled back minutes later. The operation must be atomic: it either succeeds completely or fails completely, leaving the system as if it were never attempted. This non-negotiable requirement pointed directly to the Two-Phase Commit (2PC) protocol. Despite its known drawbacks, its guarantee of atomicity was the correct trade-off for this critical, low-frequency IDP operation.

Our architecture consists of a central Transaction Coordinator and three Participant services, all written in Go and deployed on the same Kubernetes cluster they manage.

  1. Coordinator: A stateful service that orchestrates the two phases.
  2. Resource-DB Participant: Manages the service metadata in PostgreSQL.
  3. K8s-Provisioner Participant: Interacts with the Kubernetes API server using client-go.
  4. Gateway-Config Participant: Configures routes in our API Gateway (Kong, in this case).

The entire workflow is initiated from our IDP’s frontend, a React application whose components are developed and tested in isolation using Storybook. A request flows from the UI through the main API Gateway to the Coordinator, which then begins the transaction.

The Transaction Coordinator Implementation

The Coordinator is the brain of the operation. Its primary responsibility is to drive the transaction through the PREPARE and COMMIT/ABORT phases and persist transaction state to disk to handle its own failures. A common mistake is building a stateless coordinator; if it crashes after all participants have voted to commit but before it has sent all commit messages, the system enters an indeterminate state upon restart unless the coordinator can recover its intention.

We use a simple file-based write-ahead log (WAL) for durability, managed by a sync.Mutex for concurrent requests. For a production system, using an embedded database like BoltDB or etcd would be more robust.

// pkg/coordinator/coordinator.go

package coordinator

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	"github.com/google/uuid"
	"golang.org/x/sync/errgroup"
)

// TransactionState represents the state of a distributed transaction.
type TransactionState string

const (
	StatePreparing TransactionState = "PREPARING"
	StateCommitted TransactionState = "COMMITTED"
	StateAborted   TransactionState = "ABORTED"
)

// Vote represents the response from a participant during the prepare phase.
type Vote string

const (
	VoteCommit Vote = "VOTE_COMMIT"
	VoteAbort  Vote = "VOTE_ABORT"
)

// ParticipantClient defines the interface for communicating with a transaction participant.
type ParticipantClient interface {
	Prepare(ctx context.Context, txID string, payload []byte) (Vote, error)
	Commit(ctx context.Context, txID string) error
	Abort(ctx context.Context, txID string) error
	Name() string
}

// Transaction represents the state of a single transaction.
type Transaction struct {
	ID        string           `json:"id"`
	State     TransactionState `json:"state"`
	Payload   []byte           `json:"-"` // Don't log the payload
	StartTime time.Time        `json:"start_time"`
}

// Coordinator manages distributed transactions.
type Coordinator struct {
	mu          sync.RWMutex
	wal         *os.File
	walEncoder  *json.Encoder
	participants []ParticipantClient
	transactions map[string]*Transaction
}

// NewCoordinator creates a new coordinator and recovers state from the WAL.
func NewCoordinator(walPath string, participants ...ParticipantClient) (*Coordinator, error) {
	wal, err := os.OpenFile(walPath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
	if err != nil {
		return nil, fmt.Errorf("failed to open WAL file: %w", err)
	}

	c := &Coordinator{
		wal:          wal,
		walEncoder:   json.NewEncoder(wal),
		participants: participants,
		transactions: make(map[string]*Transaction),
	}

	// In a real system, you'd implement state recovery by reading the WAL here.
	log.Println("Coordinator initialized. State recovery from WAL would happen here.")

	return c, nil
}

// logTransactionState persists the transaction state change to the WAL.
// This is critical for recovery.
func (c *Coordinator) logTransactionState(tx *Transaction) error {
	if err := c.walEncoder.Encode(tx); err != nil {
		return fmt.Errorf("failed to write to WAL: %w", err)
	}
	// In a production system, fsync is necessary to guarantee durability.
	// return c.wal.Sync()
	return nil
}

// BeginTransaction starts and executes a new distributed transaction.
func (c *Coordinator) BeginTransaction(ctx context.Context, payload []byte) (string, error) {
	txID := uuid.New().String()
	tx := &Transaction{
		ID:        txID,
		State:     StatePreparing,
		Payload:   payload,
		StartTime: time.Now(),
	}

	c.mu.Lock()
	c.transactions[txID] = tx
	if err := c.logTransactionState(tx); err != nil {
		// If we can't log, we cannot proceed.
		delete(c.transactions, txID)
		c.mu.Unlock()
		log.Printf("FATAL: Could not write to WAL for new transaction %s: %v", txID, err)
		return "", err
	}
	c.mu.Unlock()

	log.Printf("TX[%s]: Starting transaction", txID)

	// --- PHASE 1: PREPARE ---
	log.Printf("TX[%s]: Entering PREPARE phase", txID)
	g, prepCtx := errgroup.WithContext(ctx)
	
	for _, p := range c.participants {
		participant := p // a pitfall is capturing loop variables in closures
		g.Go(func() error {
			log.Printf("TX[%s]: Sending PREPARE to participant '%s'", txID, participant.Name())
			vote, err := participant.Prepare(prepCtx, txID, payload)
			if err != nil {
				log.Printf("TX[%s]: Participant '%s' failed to prepare: %v", txID, participant.Name(), err)
				return err // This will cancel other goroutines in the errgroup
			}
			if vote == VoteAbort {
				log.Printf("TX[%s]: Participant '%s' voted to ABORT", txID, participant.Name())
				return fmt.Errorf("participant %s voted to abort", participant.Name())
			}
			log.Printf("TX[%s]: Participant '%s' voted to COMMIT", txID, participant.Name())
			return nil
		})
	}

	// Wait for all participants to vote.
	if err := g.Wait(); err != nil {
		// If any participant fails or votes to abort, we move to the ABORT phase.
		log.Printf("TX[%s]: Prepare phase failed. Initiating ABORT. Reason: %v", txID, err)
		c.executeAbort(tx)
		return txID, fmt.Errorf("transaction aborted: %w", err)
	}
	
	// --- PHASE 2: COMMIT ---
	log.Printf("TX[%s]: All participants voted to commit. Initiating COMMIT", txID)
	err := c.executeCommit(tx)
	if err != nil {
		// This is the danger zone for 2PC. The decision to commit was made,
		// but the commit itself failed for some participants.
		// The coordinator must now retry committing until it succeeds.
		// A robust implementation would have a background process for this.
		log.Printf("CRITICAL: TX[%s]: Commit phase failed: %v. Manual intervention may be required.", txID, err)
		return txID, err
	}

	log.Printf("TX[%s]: Transaction committed successfully", txID)
	return txID, nil
}

func (c *Coordinator) executeCommit(tx *Transaction) error {
	c.mu.Lock()
	tx.State = StateCommitted
	err := c.logTransactionState(tx)
	c.mu.Unlock()
	if err != nil {
		// If we can't log the commit decision, we're in a bad state.
		// The coordinator might restart and not know it decided to commit.
		log.Printf("FATAL: Could not write COMMIT state to WAL for transaction %s: %v", tx.ID, err)
		return err
	}

	// Non-blocking commit notifications. In a real system, you'd use a persistent queue.
	for _, p := range c.participants {
		go func(participant ParticipantClient) {
			// Retry loop for commit is essential
			for {
				err := participant.Commit(context.Background(), tx.ID)
				if err == nil {
					log.Printf("TX[%s]: Participant '%s' acknowledged COMMIT", tx.ID, participant.Name())
					return
				}
				log.Printf("TX[%s]: Failed to send COMMIT to '%s', retrying in 5s... Error: %v", tx.ID, participant.Name(), err)
				time.Sleep(5 * time.Second)
			}
		}(p)
	}
	return nil
}


func (c *Coordinator) executeAbort(tx *Transaction) {
	c.mu.Lock()
	tx.State = StateAborted
	err := c.logTransactionState(tx)
	c.mu.Unlock()
	if err != nil {
		log.Printf("FATAL: Could not write ABORT state to WAL for transaction %s: %v", tx.ID, err)
		// Even if logging fails, we must proceed with telling participants to abort.
	}
	
	// Send abort to all participants.
	for _, p := range c.participants {
		go func(participant ParticipantClient) {
			// Similar to commit, abort should also be retried.
			for {
				err := participant.Abort(context.Background(), tx.ID)
				if err == nil {
					log.Printf("TX[%s]: Participant '%s' acknowledged ABORT", tx.ID, participant.Name())
					return
				}
				log.Printf("TX[%s]: Failed to send ABORT to '%s', retrying in 5s... Error: %v", tx.ID, participant.Name(), err)
				time.Sleep(5 * time.Second)
			}
		}(p)
	}
}

func (c *Coordinator) Close() error {
	return c.wal.Close()
}

The Kubernetes Provisioner Participant

This is where theory meets reality. The participant cannot simply apply Kubernetes manifests in the Prepare phase, as that would be an un-revertible action. The key is to make the Prepare phase a non-mutating validation and staging step.

Our K8s-Provisioner-Service does the following:

  1. On Prepare: It receives the desired application configuration (image, replicas, etc.). It generates the Deployment, Service, and Ingress manifests. Then, it uses the Kubernetes API’s dry-run feature to validate them. If valid, it stores these manifests in a temporary location (we use a Redis cache keyed by transaction ID) and votes VOTE_COMMIT. If validation fails, it votes VOTE_ABORT.
  2. On Commit: It retrieves the manifests from Redis and applies them to the cluster.
  3. On Abort: It deletes the manifests from Redis.
// pkg/k8sprovisioner/participant.go

package k8sprovisioner

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v8"
	appsv1 "k8s.io/api/apps/v1"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/intstr"
	"k8s.io/client-go/kubernetes"
)

// ProvisioningPayload defines the structure for a new environment request.
type ProvisioningPayload struct {
	ServiceName string `json:"service_name"`
	Namespace   string `json:"namespace"`
	Image       string `json:"image"`
	Replicas    int32  `json:"replicas"`
	Port        int32  `json:"port"`
	Host        string `json:"host"`
}

// StagedManifests holds the generated manifests before commit.
type StagedManifests struct {
	Deployment *appsv1.Deployment `json:"deployment"`
	Service    *corev1.Service    `json:"service"`
	Ingress    *networkingv1.Ingress  `json:"ingress"`
}

// K8sParticipant implements the ParticipantClient interface for Kubernetes.
type K8sParticipant struct {
	kubeClient *kubernetes.Clientset
	redisClient *redis.Client
}

func NewK8sParticipant(kubeClient *kubernetes.Clientset, redisClient *redis.Client) *K8sParticipant {
	return &K8sParticipant{kubeClient: kubeClient, redisClient: redisClient}
}

func (p *K8sParticipant) Name() string { return "k8s-provisioner" }

func (p *K8sParticipant) getStagedManifestsKey(txID string) string {
	return fmt.Sprintf("2pc:k8s:%s", txID)
}

func (p *K8sParticipant) Prepare(ctx context.Context, txID string, payloadData []byte) (string, error) {
	var payload ProvisioningPayload
	if err := json.Unmarshal(payloadData, &payload); err != nil {
		return "VOTE_ABORT", fmt.Errorf("invalid payload: %w", err)
	}

	log.Printf("K8S-P[TX:%s]: PREPARE for service %s", txID, payload.ServiceName)

	// 1. Generate manifests
	staged, err := p.generateManifests(payload)
	if err != nil {
		return "VOTE_ABORT", err
	}

	// 2. Validate manifests using server-side dry-run
	// This is a critical step to ensure what we plan to commit is valid.
	dryRunOpts := metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}
	_, err = p.kubeClient.AppsV1().Deployments(payload.Namespace).Create(ctx, staged.Deployment, dryRunOpts)
	if err != nil {
		return "VOTE_ABORT", fmt.Errorf("deployment validation failed: %w", err)
	}
	// ... validation for Service and Ingress ...

	// 3. Stage the validated manifests in Redis
	stagedData, err := json.Marshal(staged)
	if err != nil {
		return "VOTE_ABORT", fmt.Errorf("failed to serialize staged manifests: %w", err)
	}

	err = p.redisClient.Set(ctx, p.getStagedManifestsKey(txID), stagedData, 24*time.Hour).Err()
	if err != nil {
		return "VOTE_ABORT", fmt.Errorf("failed to stage manifests in redis: %w", err)
	}

	log.Printf("K8S-P[TX:%s]: Manifests for %s validated and staged. Voting COMMIT.", txID, payload.ServiceName)
	return "VOTE_COMMIT", nil
}

func (p *K8sParticipant) Commit(ctx context.Context, txID string) error {
	log.Printf("K8S-P[TX:%s]: COMMIT received", txID)
	key := p.getStagedManifestsKey(txID)

	stagedData, err := p.redisClient.Get(ctx, key).Bytes()
	if err == redis.Nil {
		// This can happen if commit is called again after a successful commit.
		// It should be treated as idempotent.
		log.Printf("K8S-P[TX:%s]: No staged manifests found, assuming already committed.", txID)
		return nil
	}
	if err != nil {
		return fmt.Errorf("failed to retrieve staged manifests from redis: %w", err)
	}

	var staged StagedManifests
	if err := json.Unmarshal(stagedData, &staged); err != nil {
		return fmt.Errorf("failed to deserialize staged manifests: %w", err)
	}
	
	// Apply the resources to the cluster
	createOpts := metav1.CreateOptions{}
	log.Printf("K8S-P[TX:%s]: Applying Deployment %s", txID, staged.Deployment.Name)
	_, err = p.kubeClient.AppsV1().Deployments(staged.Deployment.Namespace).Create(ctx, staged.Deployment, createOpts)
	if err != nil {
		// A real-world scenario needs to handle "already exists" errors to make this idempotent.
		return fmt.Errorf("failed to create deployment: %w", err)
	}
	// ... create Service and Ingress ...

	// Clean up staged data
	p.redisClient.Del(ctx, key)
	log.Printf("K8S-P[TX:%s]: COMMIT successful.", txID)
	return nil
}

func (p *K8sParticipant) Abort(ctx context.Context, txID string) error {
	log.Printf("K8S-P[TX:%s]: ABORT received. Cleaning up staged data.", txID)
	key := p.getStagedManifestsKey(txID)
	
	err := p.redisClient.Del(ctx, key).Err()
	if err != nil {
		// Log but don't return error, as abort should be a best-effort cleanup.
		log.Printf("K8S-P[TX:%s]: Failed to clean up staged manifests from redis: %v", txID, err)
	}
	return nil
}

func (p *K8sParticipant) generateManifests(payload ProvisioningPayload) (*StagedManifests, error) {
    // Boilerplate for creating Kubernetes objects...
    labels := map[string]string{"app": payload.ServiceName}
    deployment := &appsv1.Deployment{
        // ... full deployment spec
    }
    service := &corev1.Service{
        // ... full service spec
    }
    ingress := &networkingv1.Ingress{
        // ... full ingress spec
    }
    return &StagedManifests{Deployment: deployment, Service: service, Ingress: ingress}, nil
}

The logic for the Resource-DB-Participant is similar: Prepare begins a database transaction and acquires necessary locks but does not commit. Commit executes the COMMIT statement. Abort executes ROLLBACK.

Kubernetes Deployment and Configuration

Deploying this system requires careful consideration of state and availability. The Coordinator, being stateful, is deployed as a StatefulSet with a PersistentVolumeClaim to ensure its WAL file survives pod restarts.

# k8s/coordinator-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: tx-coordinator
spec:
  serviceName: "tx-coordinator"
  replicas: 1 # 2PC coordinator is a SPOF; HA requires more complex setup like Raft.
  selector:
    matchLabels:
      app: tx-coordinator
  template:
    metadata:
      labels:
        app: tx-coordinator
    spec:
      containers:
      - name: coordinator
        image: our-registry/tx-coordinator:v1.0.0
        ports:
        - containerPort: 8080
        volumeMounts:
        - name: wal-storage
          mountPath: /var/log/2pc
  volumeClaimTemplates:
  - metadata:
      name: wal-storage
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: "standard" # Or your preferred StorageClass
      resources:
        requests:
          storage: 1Gi
---
apiVersion: v1
kind: Service
metadata:
  name: tx-coordinator-svc
spec:
  selector:
    app: tx-coordinator
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8080

The participant services are stateless and can be deployed as regular Deployments.

The system flow is visualized as follows:

sequenceDiagram
    participant UI
    participant APIGateway as API Gateway
    participant Coordinator
    participant P1 as ResourceDB
    participant P2 as K8sProvisioner
    participant P3 as GatewayConfig

    UI->>+APIGateway: POST /environments
    APIGateway->>+Coordinator: startTransaction(payload)
    Coordinator->>Coordinator: Gen TX_ID, State=PREPARING, Log to WAL
    par
        Coordinator->>+P1: prepare(TX_ID)
        P1-->>-Coordinator: VOTE_COMMIT
    and
        Coordinator->>+P2: prepare(TX_ID)
        P2-->>-Coordinator: VOTE_COMMIT
    and
        Coordinator->>+P3: prepare(TX_ID)
        P3-->>-Coordinator: VOTE_ABORT
    end
    Coordinator->>Coordinator: Received an ABORT vote. State=ABORTING, Log to WAL
    par
        Coordinator->>+P1: abort(TX_ID)
        P1-->>-Coordinator: ACK
    and
        Coordinator->>+P2: abort(TX_ID)
        P2-->>-Coordinator: ACK
    and
        Coordinator->>+P3: abort(TX_ID)
        P3-->>-Coordinator: ACK
    end
    Coordinator-->>-APIGateway: 500 Internal Server Error
    APIGateway-->>-UI: Error

The Role of Storybook in the Control Plane

While the backend logic is complex, the developer experience is paramount. We used Storybook to develop the IDP’s frontend components in isolation. The component responsible for triggering this transaction, ProvisioningForm, needed to handle multiple states: idle, loading, success, and various error conditions (e.g., validation failure vs. transaction abort).

By creating stories for each state, we could perfect the UI/UX without needing a running backend. We used mock service workers to simulate the API responses from the coordinator.

// src/components/ProvisioningForm.stories.tsx

import React from 'react';
import { ComponentStory, ComponentMeta } from '@storybook/react';
import { rest } from 'msw';
import { ProvisioningForm } from './ProvisioningForm';

export default {
  title: 'Features/ProvisioningForm',
  component: ProvisioningForm,
} as ComponentMeta<typeof ProvisioningForm>;

const Template: ComponentStory<typeof ProvisioningForm> = (args) => <ProvisioningForm {...args} />;

export const Default = Template.bind({});
Default.parameters = {
  msw: {
    handlers: [
      rest.post('/api/environments', (req, res, ctx) => {
        return res(ctx.status(500), ctx.json({ message: 'Handler not implemented for this story' }));
      }),
    ],
  },
};

export const Submitting = Template.bind({});
Submitting.parameters = {
  msw: {
    handlers: [
      rest.post('/api/environments', (req, res, ctx) => {
        return res(ctx.delay('infinite')); // Simulate a long-running request
      }),
    ],
  },
};

export const Success = Template.bind({});
Success.parameters = {
  msw: {
    handlers: [
      rest.post('/api/environments', (req, res, ctx) => {
        return res(ctx.status(201), ctx.json({ transactionId: 'uuid-goes-here' }));
      }),
    ],
  },
};

export const TransactionAborted = Template.bind({});
TransactionAborted.parameters = {
  msw: {
    handlers: [
      rest.post('/api/environments', (req, res, ctx) => {
        return res(
          ctx.status(500),
          ctx.json({ 
            error: 'Transaction aborted', 
            reason: 'Participant k8s-provisioner voted to abort: Ingress host is already in use.'
          })
        );
      }),
    ],
  },
};

This approach allowed frontend developers to work in parallel with backend engineers, ensuring the control plane’s UI was robust and user-friendly before the complex distributed transaction logic was even deployed. It’s a prime example of how frontend tooling can be critical even for deeply infrastructural projects.

The current implementation has achieved its primary goal: providing atomic environment provisioning. However, it is not without its limitations. The classic 2PC coordinator remains a single point of failure. While our use of a StatefulSet with a PersistentVolume mitigates data loss from a crash, it does not provide high availability; if the coordinator node fails, all provisioning operations will be blocked until Kubernetes reschedules the pod. Furthermore, the blocking nature of the Prepare phase means participants may hold locks on resources (like database rows) for the duration of the transaction, which is only acceptable because this is an infrequent, developer-facing operation, not a high-throughput customer-facing one. Future iterations could explore a more resilient coordinator design using a consensus algorithm like Raft for its state machine or transitioning to a Saga pattern for workflows where eventual consistency is an acceptable trade-off.


  TOC