The task was to guarantee atomicity for new service onboarding within our internal developer platform (IDP). A single developer request to provision a “preview environment” triggers three distinct, remote operations: creating a metadata record in a PostgreSQL database, deploying a set of manifests to a Kubernetes cluster, and configuring a new route in our central API Gateway. Any partial failure here results in a corrupted state, requiring manual intervention—a developer might get a database entry but no running pods, or running pods that are unreachable via the gateway. Our initial scripted approach with try/catch
blocks for cleanup was brittle and insufficient for production.
This led us down the path of distributed transactions. We evaluated the Saga pattern but discarded it for this specific use case. Sagas, with their compensating transactions, offer eventual consistency. For environment provisioning, we required strong, immediate consistency. A developer should not be told an environment is “being created” only for it to fail and be rolled back minutes later. The operation must be atomic: it either succeeds completely or fails completely, leaving the system as if it were never attempted. This non-negotiable requirement pointed directly to the Two-Phase Commit (2PC) protocol. Despite its known drawbacks, its guarantee of atomicity was the correct trade-off for this critical, low-frequency IDP operation.
Our architecture consists of a central Transaction Coordinator and three Participant services, all written in Go and deployed on the same Kubernetes cluster they manage.
- Coordinator: A stateful service that orchestrates the two phases.
- Resource-DB Participant: Manages the service metadata in PostgreSQL.
- K8s-Provisioner Participant: Interacts with the Kubernetes API server using
client-go
. - Gateway-Config Participant: Configures routes in our API Gateway (Kong, in this case).
The entire workflow is initiated from our IDP’s frontend, a React application whose components are developed and tested in isolation using Storybook. A request flows from the UI through the main API Gateway to the Coordinator, which then begins the transaction.
The Transaction Coordinator Implementation
The Coordinator is the brain of the operation. Its primary responsibility is to drive the transaction through the PREPARE
and COMMIT
/ABORT
phases and persist transaction state to disk to handle its own failures. A common mistake is building a stateless coordinator; if it crashes after all participants have voted to commit but before it has sent all commit messages, the system enters an indeterminate state upon restart unless the coordinator can recover its intention.
We use a simple file-based write-ahead log (WAL) for durability, managed by a sync.Mutex
for concurrent requests. For a production system, using an embedded database like BoltDB or etcd would be more robust.
// pkg/coordinator/coordinator.go
package coordinator
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
// TransactionState represents the state of a distributed transaction.
type TransactionState string
const (
StatePreparing TransactionState = "PREPARING"
StateCommitted TransactionState = "COMMITTED"
StateAborted TransactionState = "ABORTED"
)
// Vote represents the response from a participant during the prepare phase.
type Vote string
const (
VoteCommit Vote = "VOTE_COMMIT"
VoteAbort Vote = "VOTE_ABORT"
)
// ParticipantClient defines the interface for communicating with a transaction participant.
type ParticipantClient interface {
Prepare(ctx context.Context, txID string, payload []byte) (Vote, error)
Commit(ctx context.Context, txID string) error
Abort(ctx context.Context, txID string) error
Name() string
}
// Transaction represents the state of a single transaction.
type Transaction struct {
ID string `json:"id"`
State TransactionState `json:"state"`
Payload []byte `json:"-"` // Don't log the payload
StartTime time.Time `json:"start_time"`
}
// Coordinator manages distributed transactions.
type Coordinator struct {
mu sync.RWMutex
wal *os.File
walEncoder *json.Encoder
participants []ParticipantClient
transactions map[string]*Transaction
}
// NewCoordinator creates a new coordinator and recovers state from the WAL.
func NewCoordinator(walPath string, participants ...ParticipantClient) (*Coordinator, error) {
wal, err := os.OpenFile(walPath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open WAL file: %w", err)
}
c := &Coordinator{
wal: wal,
walEncoder: json.NewEncoder(wal),
participants: participants,
transactions: make(map[string]*Transaction),
}
// In a real system, you'd implement state recovery by reading the WAL here.
log.Println("Coordinator initialized. State recovery from WAL would happen here.")
return c, nil
}
// logTransactionState persists the transaction state change to the WAL.
// This is critical for recovery.
func (c *Coordinator) logTransactionState(tx *Transaction) error {
if err := c.walEncoder.Encode(tx); err != nil {
return fmt.Errorf("failed to write to WAL: %w", err)
}
// In a production system, fsync is necessary to guarantee durability.
// return c.wal.Sync()
return nil
}
// BeginTransaction starts and executes a new distributed transaction.
func (c *Coordinator) BeginTransaction(ctx context.Context, payload []byte) (string, error) {
txID := uuid.New().String()
tx := &Transaction{
ID: txID,
State: StatePreparing,
Payload: payload,
StartTime: time.Now(),
}
c.mu.Lock()
c.transactions[txID] = tx
if err := c.logTransactionState(tx); err != nil {
// If we can't log, we cannot proceed.
delete(c.transactions, txID)
c.mu.Unlock()
log.Printf("FATAL: Could not write to WAL for new transaction %s: %v", txID, err)
return "", err
}
c.mu.Unlock()
log.Printf("TX[%s]: Starting transaction", txID)
// --- PHASE 1: PREPARE ---
log.Printf("TX[%s]: Entering PREPARE phase", txID)
g, prepCtx := errgroup.WithContext(ctx)
for _, p := range c.participants {
participant := p // a pitfall is capturing loop variables in closures
g.Go(func() error {
log.Printf("TX[%s]: Sending PREPARE to participant '%s'", txID, participant.Name())
vote, err := participant.Prepare(prepCtx, txID, payload)
if err != nil {
log.Printf("TX[%s]: Participant '%s' failed to prepare: %v", txID, participant.Name(), err)
return err // This will cancel other goroutines in the errgroup
}
if vote == VoteAbort {
log.Printf("TX[%s]: Participant '%s' voted to ABORT", txID, participant.Name())
return fmt.Errorf("participant %s voted to abort", participant.Name())
}
log.Printf("TX[%s]: Participant '%s' voted to COMMIT", txID, participant.Name())
return nil
})
}
// Wait for all participants to vote.
if err := g.Wait(); err != nil {
// If any participant fails or votes to abort, we move to the ABORT phase.
log.Printf("TX[%s]: Prepare phase failed. Initiating ABORT. Reason: %v", txID, err)
c.executeAbort(tx)
return txID, fmt.Errorf("transaction aborted: %w", err)
}
// --- PHASE 2: COMMIT ---
log.Printf("TX[%s]: All participants voted to commit. Initiating COMMIT", txID)
err := c.executeCommit(tx)
if err != nil {
// This is the danger zone for 2PC. The decision to commit was made,
// but the commit itself failed for some participants.
// The coordinator must now retry committing until it succeeds.
// A robust implementation would have a background process for this.
log.Printf("CRITICAL: TX[%s]: Commit phase failed: %v. Manual intervention may be required.", txID, err)
return txID, err
}
log.Printf("TX[%s]: Transaction committed successfully", txID)
return txID, nil
}
func (c *Coordinator) executeCommit(tx *Transaction) error {
c.mu.Lock()
tx.State = StateCommitted
err := c.logTransactionState(tx)
c.mu.Unlock()
if err != nil {
// If we can't log the commit decision, we're in a bad state.
// The coordinator might restart and not know it decided to commit.
log.Printf("FATAL: Could not write COMMIT state to WAL for transaction %s: %v", tx.ID, err)
return err
}
// Non-blocking commit notifications. In a real system, you'd use a persistent queue.
for _, p := range c.participants {
go func(participant ParticipantClient) {
// Retry loop for commit is essential
for {
err := participant.Commit(context.Background(), tx.ID)
if err == nil {
log.Printf("TX[%s]: Participant '%s' acknowledged COMMIT", tx.ID, participant.Name())
return
}
log.Printf("TX[%s]: Failed to send COMMIT to '%s', retrying in 5s... Error: %v", tx.ID, participant.Name(), err)
time.Sleep(5 * time.Second)
}
}(p)
}
return nil
}
func (c *Coordinator) executeAbort(tx *Transaction) {
c.mu.Lock()
tx.State = StateAborted
err := c.logTransactionState(tx)
c.mu.Unlock()
if err != nil {
log.Printf("FATAL: Could not write ABORT state to WAL for transaction %s: %v", tx.ID, err)
// Even if logging fails, we must proceed with telling participants to abort.
}
// Send abort to all participants.
for _, p := range c.participants {
go func(participant ParticipantClient) {
// Similar to commit, abort should also be retried.
for {
err := participant.Abort(context.Background(), tx.ID)
if err == nil {
log.Printf("TX[%s]: Participant '%s' acknowledged ABORT", tx.ID, participant.Name())
return
}
log.Printf("TX[%s]: Failed to send ABORT to '%s', retrying in 5s... Error: %v", tx.ID, participant.Name(), err)
time.Sleep(5 * time.Second)
}
}(p)
}
}
func (c *Coordinator) Close() error {
return c.wal.Close()
}
The Kubernetes Provisioner Participant
This is where theory meets reality. The participant cannot simply apply Kubernetes manifests in the Prepare
phase, as that would be an un-revertible action. The key is to make the Prepare
phase a non-mutating validation and staging step.
Our K8s-Provisioner-Service
does the following:
- On
Prepare
: It receives the desired application configuration (image, replicas, etc.). It generates theDeployment
,Service
, andIngress
manifests. Then, it uses the Kubernetes API’s dry-run feature to validate them. If valid, it stores these manifests in a temporary location (we use a Redis cache keyed by transaction ID) and votesVOTE_COMMIT
. If validation fails, it votesVOTE_ABORT
. - On
Commit
: It retrieves the manifests from Redis and applies them to the cluster. - On
Abort
: It deletes the manifests from Redis.
// pkg/k8sprovisioner/participant.go
package k8sprovisioner
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
)
// ProvisioningPayload defines the structure for a new environment request.
type ProvisioningPayload struct {
ServiceName string `json:"service_name"`
Namespace string `json:"namespace"`
Image string `json:"image"`
Replicas int32 `json:"replicas"`
Port int32 `json:"port"`
Host string `json:"host"`
}
// StagedManifests holds the generated manifests before commit.
type StagedManifests struct {
Deployment *appsv1.Deployment `json:"deployment"`
Service *corev1.Service `json:"service"`
Ingress *networkingv1.Ingress `json:"ingress"`
}
// K8sParticipant implements the ParticipantClient interface for Kubernetes.
type K8sParticipant struct {
kubeClient *kubernetes.Clientset
redisClient *redis.Client
}
func NewK8sParticipant(kubeClient *kubernetes.Clientset, redisClient *redis.Client) *K8sParticipant {
return &K8sParticipant{kubeClient: kubeClient, redisClient: redisClient}
}
func (p *K8sParticipant) Name() string { return "k8s-provisioner" }
func (p *K8sParticipant) getStagedManifestsKey(txID string) string {
return fmt.Sprintf("2pc:k8s:%s", txID)
}
func (p *K8sParticipant) Prepare(ctx context.Context, txID string, payloadData []byte) (string, error) {
var payload ProvisioningPayload
if err := json.Unmarshal(payloadData, &payload); err != nil {
return "VOTE_ABORT", fmt.Errorf("invalid payload: %w", err)
}
log.Printf("K8S-P[TX:%s]: PREPARE for service %s", txID, payload.ServiceName)
// 1. Generate manifests
staged, err := p.generateManifests(payload)
if err != nil {
return "VOTE_ABORT", err
}
// 2. Validate manifests using server-side dry-run
// This is a critical step to ensure what we plan to commit is valid.
dryRunOpts := metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}
_, err = p.kubeClient.AppsV1().Deployments(payload.Namespace).Create(ctx, staged.Deployment, dryRunOpts)
if err != nil {
return "VOTE_ABORT", fmt.Errorf("deployment validation failed: %w", err)
}
// ... validation for Service and Ingress ...
// 3. Stage the validated manifests in Redis
stagedData, err := json.Marshal(staged)
if err != nil {
return "VOTE_ABORT", fmt.Errorf("failed to serialize staged manifests: %w", err)
}
err = p.redisClient.Set(ctx, p.getStagedManifestsKey(txID), stagedData, 24*time.Hour).Err()
if err != nil {
return "VOTE_ABORT", fmt.Errorf("failed to stage manifests in redis: %w", err)
}
log.Printf("K8S-P[TX:%s]: Manifests for %s validated and staged. Voting COMMIT.", txID, payload.ServiceName)
return "VOTE_COMMIT", nil
}
func (p *K8sParticipant) Commit(ctx context.Context, txID string) error {
log.Printf("K8S-P[TX:%s]: COMMIT received", txID)
key := p.getStagedManifestsKey(txID)
stagedData, err := p.redisClient.Get(ctx, key).Bytes()
if err == redis.Nil {
// This can happen if commit is called again after a successful commit.
// It should be treated as idempotent.
log.Printf("K8S-P[TX:%s]: No staged manifests found, assuming already committed.", txID)
return nil
}
if err != nil {
return fmt.Errorf("failed to retrieve staged manifests from redis: %w", err)
}
var staged StagedManifests
if err := json.Unmarshal(stagedData, &staged); err != nil {
return fmt.Errorf("failed to deserialize staged manifests: %w", err)
}
// Apply the resources to the cluster
createOpts := metav1.CreateOptions{}
log.Printf("K8S-P[TX:%s]: Applying Deployment %s", txID, staged.Deployment.Name)
_, err = p.kubeClient.AppsV1().Deployments(staged.Deployment.Namespace).Create(ctx, staged.Deployment, createOpts)
if err != nil {
// A real-world scenario needs to handle "already exists" errors to make this idempotent.
return fmt.Errorf("failed to create deployment: %w", err)
}
// ... create Service and Ingress ...
// Clean up staged data
p.redisClient.Del(ctx, key)
log.Printf("K8S-P[TX:%s]: COMMIT successful.", txID)
return nil
}
func (p *K8sParticipant) Abort(ctx context.Context, txID string) error {
log.Printf("K8S-P[TX:%s]: ABORT received. Cleaning up staged data.", txID)
key := p.getStagedManifestsKey(txID)
err := p.redisClient.Del(ctx, key).Err()
if err != nil {
// Log but don't return error, as abort should be a best-effort cleanup.
log.Printf("K8S-P[TX:%s]: Failed to clean up staged manifests from redis: %v", txID, err)
}
return nil
}
func (p *K8sParticipant) generateManifests(payload ProvisioningPayload) (*StagedManifests, error) {
// Boilerplate for creating Kubernetes objects...
labels := map[string]string{"app": payload.ServiceName}
deployment := &appsv1.Deployment{
// ... full deployment spec
}
service := &corev1.Service{
// ... full service spec
}
ingress := &networkingv1.Ingress{
// ... full ingress spec
}
return &StagedManifests{Deployment: deployment, Service: service, Ingress: ingress}, nil
}
The logic for the Resource-DB-Participant
is similar: Prepare
begins a database transaction and acquires necessary locks but does not commit. Commit
executes the COMMIT
statement. Abort
executes ROLLBACK
.
Kubernetes Deployment and Configuration
Deploying this system requires careful consideration of state and availability. The Coordinator, being stateful, is deployed as a StatefulSet
with a PersistentVolumeClaim
to ensure its WAL file survives pod restarts.
# k8s/coordinator-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: tx-coordinator
spec:
serviceName: "tx-coordinator"
replicas: 1 # 2PC coordinator is a SPOF; HA requires more complex setup like Raft.
selector:
matchLabels:
app: tx-coordinator
template:
metadata:
labels:
app: tx-coordinator
spec:
containers:
- name: coordinator
image: our-registry/tx-coordinator:v1.0.0
ports:
- containerPort: 8080
volumeMounts:
- name: wal-storage
mountPath: /var/log/2pc
volumeClaimTemplates:
- metadata:
name: wal-storage
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "standard" # Or your preferred StorageClass
resources:
requests:
storage: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: tx-coordinator-svc
spec:
selector:
app: tx-coordinator
ports:
- protocol: TCP
port: 80
targetPort: 8080
The participant services are stateless and can be deployed as regular Deployments
.
The system flow is visualized as follows:
sequenceDiagram participant UI participant APIGateway as API Gateway participant Coordinator participant P1 as ResourceDB participant P2 as K8sProvisioner participant P3 as GatewayConfig UI->>+APIGateway: POST /environments APIGateway->>+Coordinator: startTransaction(payload) Coordinator->>Coordinator: Gen TX_ID, State=PREPARING, Log to WAL par Coordinator->>+P1: prepare(TX_ID) P1-->>-Coordinator: VOTE_COMMIT and Coordinator->>+P2: prepare(TX_ID) P2-->>-Coordinator: VOTE_COMMIT and Coordinator->>+P3: prepare(TX_ID) P3-->>-Coordinator: VOTE_ABORT end Coordinator->>Coordinator: Received an ABORT vote. State=ABORTING, Log to WAL par Coordinator->>+P1: abort(TX_ID) P1-->>-Coordinator: ACK and Coordinator->>+P2: abort(TX_ID) P2-->>-Coordinator: ACK and Coordinator->>+P3: abort(TX_ID) P3-->>-Coordinator: ACK end Coordinator-->>-APIGateway: 500 Internal Server Error APIGateway-->>-UI: Error
The Role of Storybook in the Control Plane
While the backend logic is complex, the developer experience is paramount. We used Storybook to develop the IDP’s frontend components in isolation. The component responsible for triggering this transaction, ProvisioningForm
, needed to handle multiple states: idle, loading, success, and various error conditions (e.g., validation failure vs. transaction abort).
By creating stories for each state, we could perfect the UI/UX without needing a running backend. We used mock service workers to simulate the API responses from the coordinator.
// src/components/ProvisioningForm.stories.tsx
import React from 'react';
import { ComponentStory, ComponentMeta } from '@storybook/react';
import { rest } from 'msw';
import { ProvisioningForm } from './ProvisioningForm';
export default {
title: 'Features/ProvisioningForm',
component: ProvisioningForm,
} as ComponentMeta<typeof ProvisioningForm>;
const Template: ComponentStory<typeof ProvisioningForm> = (args) => <ProvisioningForm {...args} />;
export const Default = Template.bind({});
Default.parameters = {
msw: {
handlers: [
rest.post('/api/environments', (req, res, ctx) => {
return res(ctx.status(500), ctx.json({ message: 'Handler not implemented for this story' }));
}),
],
},
};
export const Submitting = Template.bind({});
Submitting.parameters = {
msw: {
handlers: [
rest.post('/api/environments', (req, res, ctx) => {
return res(ctx.delay('infinite')); // Simulate a long-running request
}),
],
},
};
export const Success = Template.bind({});
Success.parameters = {
msw: {
handlers: [
rest.post('/api/environments', (req, res, ctx) => {
return res(ctx.status(201), ctx.json({ transactionId: 'uuid-goes-here' }));
}),
],
},
};
export const TransactionAborted = Template.bind({});
TransactionAborted.parameters = {
msw: {
handlers: [
rest.post('/api/environments', (req, res, ctx) => {
return res(
ctx.status(500),
ctx.json({
error: 'Transaction aborted',
reason: 'Participant k8s-provisioner voted to abort: Ingress host is already in use.'
})
);
}),
],
},
};
This approach allowed frontend developers to work in parallel with backend engineers, ensuring the control plane’s UI was robust and user-friendly before the complex distributed transaction logic was even deployed. It’s a prime example of how frontend tooling can be critical even for deeply infrastructural projects.
The current implementation has achieved its primary goal: providing atomic environment provisioning. However, it is not without its limitations. The classic 2PC coordinator remains a single point of failure. While our use of a StatefulSet
with a PersistentVolume
mitigates data loss from a crash, it does not provide high availability; if the coordinator node fails, all provisioning operations will be blocked until Kubernetes reschedules the pod. Furthermore, the blocking nature of the Prepare
phase means participants may hold locks on resources (like database rows) for the duration of the transaction, which is only acceptable because this is an infrequent, developer-facing operation, not a high-throughput customer-facing one. Future iterations could explore a more resilient coordinator design using a consensus algorithm like Raft for its state machine or transitioning to a Saga pattern for workflows where eventual consistency is an acceptable trade-off.