Building a Distributed Mobile CI/CD Control Plane with a Paxos-based Transactional State Log and a Material-UI Frontend


Our mobile release pipeline state was managed across a collection of shell scripts, Jenkins job statuses, and a manually updated Confluence page. A failed deployment often left the system in an ambiguous state: was the artifact promoted? Was the test environment lock released? The manual rollback and cleanup process was error-prone and a significant source of operational drag. The core problem wasn’t the automation itself, but the lack of a durable, consistent, and fault-tolerant source of truth for the state of the pipeline. A developer initiating a “promote to staging” action needed to be an atomic transaction, not a fire-and-forget Jenkins job trigger.

We initially considered a PostgreSQL database with SELECT ... FOR UPDATE to manage locks and state transitions. This would provide ACID guarantees, but it introduced a new single point of failure. The control plane managing our releases couldn’t go down if its database needed maintenance or failed over. This requirement for high availability in the control plane itself pushed us toward distributed consensus. Instead of relying on an external system, we decided to build a lightweight, application-embedded distributed state machine using a simplified implementation of Paxos. The state of our CI/CD pipeline would be stored in a replicated, in-memory key-value store, updated via a commit log that guaranteed sequential consistency.

For the developer interface, the choice was pragmatic. Our internal tools are built on React, and Material-UI (MUI) provides a comprehensive, production-ready component library that allows us to build a clean, functional UI without getting bogged down in CSS. The real challenge was connecting a sophisticated frontend experience to a backend that operates on asynchronous consensus.

The Foundation: A Replicated Commit Log

Before diving into Paxos, the fundamental building block is a deterministic state machine that evolves by applying commands from a commit log. On a single node, this is trivial. The complexity arises when replicating this log across multiple nodes to achieve fault tolerance.

Our state represents the entire mobile CI/CD world: available build agents, environment locks, and the promotion status of release candidates.

// internal/statemachine/state.go
package statemachine

import (
	"fmt"
	"sync"
	"time"
)

// CommandType defines the types of operations that can modify our state.
type CommandType string

const (
	CmdRegisterAgent    CommandType = "REGISTER_AGENT"
	CmdClaimAgent       CommandType = "CLAIM_AGENT"
	CmdReleaseAgent     CommandType = "RELEASE_AGENT"
	CmdLockEnvironment  CommandType = "LOCK_ENVIRONMENT"
	CmdUnlockEnvironment CommandType = "UNLOCK_ENVIRONMENT"
	CmdPromoteBuild     CommandType = "PROMOTE_BUILD"
)

// Command represents a single, atomic operation to be applied to the state machine.
// In a real-world project, Payload would be a more structured, typed object.
type Command struct {
	Type    CommandType       `json:"type"`
	Payload map[string]string `json:"payload"`
}

// State represents the current snapshot of our CI/CD control plane.
type State struct {
	mu sync.RWMutex

	// agents maps agent ID to its status (e.g., "idle", "busy")
	agents map[string]string

	// environmentLocks maps environment name (e.g., "staging", "production") to the build ID that holds the lock.
	environmentLocks map[string]string

	// buildStatus maps build ID to its current stage (e.g., "qa_passed", "staging_deployed")
	buildStatus map[string]string
}

func NewState() *State {
	return &State{
		agents:           make(map[string]string),
		environmentLocks: make(map[string]string),
		buildStatus:      make(map[string]string),
	}
}

// Apply deterministically transitions the state based on a command.
// This is the core of our state machine. It MUST be deterministic.
// Given the same initial state and the same sequence of commands, it must always produce the same final state.
// It returns an error if a command is invalid in the current state (e.g., locking an already locked environment).
func (s *State) Apply(cmd Command) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	switch cmd.Type {
	case CmdRegisterAgent:
		agentID, ok := cmd.Payload["agent_id"]
		if !ok {
			return fmt.Errorf("missing agent_id for REGISTER_AGENT")
		}
		s.agents[agentID] = "idle"
		fmt.Printf("[State] Agent %s registered\n", agentID)

	case CmdLockEnvironment:
		env, envOk := cmd.Payload["environment"]
		buildID, buildOk := cmd.Payload["build_id"]
		if !envOk || !buildOk {
			return fmt.Errorf("missing environment or build_id for LOCK_ENVIRONMENT")
		}
		if locker, locked := s.environmentLocks[env]; locked {
			// This is a critical check for ensuring transactional semantics.
			return fmt.Errorf("environment %s is already locked by build %s", env, locker)
		}
		s.environmentLocks[env] = buildID
		fmt.Printf("[State] Environment %s locked by build %s\n", env, buildID)

	case CmdUnlockEnvironment:
		env, ok := cmd.Payload["environment"]
		if !ok {
			return fmt.Errorf("missing environment for UNLOCK_ENVIRONMENT")
		}
		delete(s.environmentLocks, env)
		fmt.Printf("[State] Environment %s unlocked\n", env)
	
	case CmdPromoteBuild:
		buildID, buildOk := cmd.Payload["build_id"]
		targetEnv, envOk := cmd.Payload["target_env"]
		if !buildOk || !envOk {
			return fmt.Errorf("missing build_id or target_env for PROMOTE_BUILD")
		}
		// A common mistake is not checking preconditions. The state machine enforces business rules.
		if _, locked := s.environmentLocks[targetEnv]; !locked {
			return fmt.Errorf("cannot promote to %s, environment is not locked", targetEnv)
		}
		if s.environmentLocks[targetEnv] != buildID {
			return fmt.Errorf("cannot promote build %s to %s, lock held by %s", buildID, targetEnv, s.environmentLocks[targetEnv])
		}
		s.buildStatus[buildID] = fmt.Sprintf("%s_deployed", targetEnv)
		fmt.Printf("[State] Build %s promoted to %s\n", buildID, targetEnv)

	default:
		return fmt.Errorf("unknown command type: %s", cmd.Type)
	}

	return nil
}

// GetState provides a read-only copy for external queries.
func (s *State) GetState() map[string]interface{} {
	s.mu.RLock()
	defer s.mu.RUnlock()

	// Deep copy to prevent race conditions from concurrent access
	agentsCopy := make(map[string]string)
	for k, v := range s.agents {
		agentsCopy[k] = v
	}
	locksCopy := make(map[string]string)
	for k, v := range s.environmentLocks {
		locksCopy[k] = v
	}
	statusCopy := make(map[string]string)
	for k, v := range s.buildStatus {
		statusCopy[k] = v
	}

	return map[string]interface{}{
		"agents":           agentsCopy,
		"environmentLocks": locksCopy,
		"buildStatus":      statusCopy,
	}
}

This state machine provides the C (Consistency) and A (Atomicity) of ACID. Each Command is an atomic unit of work. The Apply function ensures the state transitions are always valid. The next step is achieving I (Isolation) and D (Durability) in a distributed setting.

Implementing a Paxos-like Consensus Module

We don’t need a full-blown, feature-complete Paxos implementation. We need just enough to agree on a single value (our Command) for each slot in our distributed log. We’ll implement the Proposer and Acceptor roles. For simplicity, we’ll combine the Learner role with the Proposer.

The core idea is a two-phase protocol:

  1. Prepare Phase: A Proposer sends a Prepare request with a proposal number N to a quorum of Acceptors. Acceptors promise not to accept any proposal numbered less than N.
  2. Accept Phase: If the Proposer receives promises from a quorum, it sends an Accept request with the proposal number N and the proposed value V. Acceptors accept this proposal if they haven’t already promised a higher proposal number.

The proposal number N is crucial for correctness. A simple way to generate unique, increasing numbers is to use a combination of a timestamp and the node’s unique ID.

// internal/consensus/paxos.go
package consensus

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/my-org/cicd-control-plane/internal/statemachine"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

// A simplified representation of a log entry.
type LogEntry struct {
	Term    int64
	Command statemachine.Command
}

// Node represents a participant in the Paxos cluster.
type Node struct {
	UnimplementedPaxosServiceServer // for gRPC

	id          string
	peerAddrs   map[string]string
	peerClients map[string]PaxosServiceClient

	mu           sync.Mutex
	promisedTerm int64
	acceptedTerm int64
	acceptedVal  *LogEntry

	commitChan chan<- statemachine.Command // Channel to send committed commands to the state machine
	stateMachine *statemachine.State

	// For proposer logic
	currentTerm int64
}

func NewNode(id string, peerAddrs map[string]string, commitChan chan<- statemachine.Command, sm *statemachine.State) *Node {
	return &Node{
		id:          id,
		peerAddrs:   peerAddrs,
		peerClients: make(map[string]PaxosServiceClient),
		commitChan:  commitChan,
		stateMachine: sm,
		currentTerm: time.Now().UnixNano(), // Initial term generation
	}
}

// ConnectToPeers establishes gRPC connections to other nodes in the cluster.
func (n *Node) ConnectToPeers() {
	for peerID, addr := range n.peerAddrs {
		if peerID == n.id {
			continue
		}
		// In a production setup, you would use TLS credentials.
		conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
		if err != nil {
			log.Fatalf("Failed to connect to peer %s: %v", peerID, err)
		}
		n.peerClients[peerID] = NewPaxosServiceClient(conn)
		log.Printf("Connected to peer %s at %s", peerID, addr)
	}
}

// ---- Acceptor Logic (gRPC Handlers) ----

func (n *Node) Prepare(ctx context.Context, req *PrepareRequest) (*PrepareResponse, error) {
	n.mu.Lock()
	defer n.mu.Unlock()

	// The core of the Paxos promise.
	if req.Term > n.promisedTerm {
		n.promisedTerm = req.Term
		log.Printf("[%s] Promised term %d. Returning previously accepted (Term: %d)", n.id, req.Term, n.acceptedTerm)
		return &PrepareResponse{
			Ok:            true,
			PromisedTerm:  n.promisedTerm,
			AcceptedTerm:  n.acceptedTerm,
			AcceptedValue: n.acceptedVal,
		}, nil
	}

	log.Printf("[%s] Rejected prepare for term %d, already promised %d", n.id, req.Term, n.promisedTerm)
	return &PrepareResponse{Ok: false, PromisedTerm: n.promisedTerm}, nil
}

func (n *Node) Accept(ctx context.Context, req *AcceptRequest) (*AcceptResponse, error) {
	n.mu.Lock()
	defer n.mu.Unlock()

	// If a prepare request with a higher term has arrived, we must reject this accept.
	if req.Term >= n.promisedTerm {
		n.promisedTerm = req.Term
		n.acceptedTerm = req.Term
		n.acceptedVal = req.Value
		log.Printf("[%s] Accepted value for term %d", n.id, req.Term)
		return &AcceptResponse{Ok: true}, nil
	}

	log.Printf("[%s] Rejected accept for term %d, already promised %d", n.id, req.Term, n.promisedTerm)
	return &AcceptResponse{Ok: false}, nil
}

// ---- Proposer Logic ----

func (n *Node) Propose(cmd statemachine.Command) error {
	n.mu.Lock()
	n.currentTerm++
	term := n.currentTerm
	n.mu.Unlock()

	log.Printf("[%s] Starting proposal for term %d", n.id, term)

	// Phase 1: Prepare
	prepareCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	promiseCount := 1 // Count self
	var highestTermVal *LogEntry
	var highestTerm int64 = -1
	
	var wg sync.WaitGroup
	promiseChan := make(chan *PrepareResponse, len(n.peerClients))

	for id, client := range n.peerClients {
		wg.Add(1)
		go func(id string, client PaxosServiceClient) {
			defer wg.Done()
			resp, err := client.Prepare(prepareCtx, &PrepareRequest{Term: term})
			if err != nil {
				log.Printf("[%s] Error sending Prepare to %s: %v", n.id, id, err)
				return
			}
			promiseChan <- resp
		}(id, client)
	}
	wg.Wait()
	close(promiseChan)

	for resp := range promiseChan {
		if resp.Ok {
			promiseCount++
			if resp.AcceptedValue != nil && resp.AcceptedTerm > highestTerm {
				highestTerm = resp.AcceptedTerm
				highestTermVal = resp.AcceptedValue
			}
		}
	}
	
	quorum := (len(n.peerAddrs) / 2) + 1
	if promiseCount < quorum {
		log.Printf("[%s] Failed to get promise quorum for term %d. (Got %d, need %d)", n.id, term, promiseCount, quorum)
		return fmt.Errorf("failed to get promise quorum")
	}
	
	log.Printf("[%s] Got promise quorum for term %d", n.id, term)

	// Determine value to propose. If any acceptor already accepted a value, we MUST propose that value.
	// This is the critical rule that ensures safety.
	valToPropose := &LogEntry{Term: term, Command: cmd}
	if highestTermVal != nil {
		valToPropose = highestTermVal
		log.Printf("[%s] A previous value was accepted, reproposing it for term %d", n.id, term)
	}

	// Phase 2: Accept
	acceptCtx, cancelAccept := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancelAccept()

	acceptCount := 1 // Count self
	acceptChan := make(chan bool, len(n.peerClients))
	var acceptWg sync.WaitGroup

	for id, client := range n.peerClients {
		acceptWg.Add(1)
		go func(id string, client PaxosServiceClient) {
			defer acceptWg.Done()
			resp, err := client.Accept(acceptCtx, &AcceptRequest{Term: term, Value: valToPropose})
			if err != nil {
				log.Printf("[%s] Error sending Accept to %s: %v", n.id, id, err)
				acceptChan <- false
				return
			}
			acceptChan <- resp.Ok
		}(id, client)
	}
	acceptWg.Wait()
	close(acceptChan)

	for ok := range acceptChan {
		if ok {
			acceptCount++
		}
	}
	
	if acceptCount < quorum {
		log.Printf("[%s] Failed to get accept quorum for term %d. (Got %d, need %d)", n.id, term, acceptCount, quorum)
		return fmt.Errorf("failed to get accept quorum")
	}

	log.Printf("[%s] *** COMMIT *** Value committed for term %d", n.id, term)
	
	// Once committed, we can send to the state machine.
	// In a real implementation, a Learner would broadcast this, and we'd handle log sequence numbers.
	// Here, we simplify: the proposer who achieves consensus applies the command.
	n.commitChan <- valToPropose.Command

	return nil
}

This code is a simplified but functional core of a consensus module. A real-world project would need to handle log replication, leader election (like in Raft, which is often preferred over raw Paxos for implementation simplicity), log compaction, and dynamic membership changes. The pitfall here is underestimating this complexity. For our internal tool, this simplified model was sufficient to prove the architecture.

The API Layer: Bridging Frontend and Consensus

The frontend cannot talk Paxos. It needs a simple, synchronous-looking API. We built a thin Go HTTP server that accepts a command, passes it to the Propose method of the Paxos Node, and waits for the result.

// cmd/server/main.go

// ... (imports and setup for Node, StateMachine)

func commandHandler(node *consensus.Node) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        if r.Method != http.MethodPost {
            http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
            return
        }

        var cmd statemachine.Command
        if err := json.NewDecoder(r.Body).Decode(&cmd); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
        }

        // The Propose call is blocking until consensus is reached or it times out.
        // This abstracts away the asynchronous, distributed nature from the client.
        err := node.Propose(cmd)
        if err != nil {
            // This error indicates a failure to reach consensus, not a business logic failure.
            log.Printf("Proposal failed: %v", err)
            http.Error(w, fmt.Sprintf("Consensus failed: %v", err), http.StatusInternalServerError)
            return
        }

        // A successful Propose means the command WILL be applied eventually.
        // For simplicity, we assume immediate application for the HTTP response.
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(map[string]string{"status": "committed"})
    }
}

func stateHandler(sm *statemachine.State) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(sm.GetState())
    }
}


func main() {
    // ... (parse flags for node ID, peer addresses, etc.)

    commitChan := make(chan statemachine.Command)
    sm := statemachine.NewState()

    // Goroutine to apply committed commands to the state machine
    go func() {
        for cmd := range commitChan {
            if err := sm.Apply(cmd); err != nil {
                // In a production system, this is a critical error.
                // It means a committed command is invalid, implying a bug in the client or state machine logic.
                log.Printf("CRITICAL: Failed to apply committed command: %v", err)
            }
        }
    }()

    paxosNode := consensus.NewNode(nodeID, peerAddrs, commitChan, sm)
    paxosNode.ConnectToPeers()

    // Start gRPC server for Paxos communication
    // ...

    // Start HTTP server for client API
    http.HandleFunc("/command", commandHandler(paxosNode))
    http.HandleFunc("/state", stateHandler(sm))
    log.Printf("HTTP API server listening on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

The Material-UI Frontend: Visualizing Transactional State

The final piece is the UI. A developer needs to see the current state and initiate actions. The key challenge is that our API calls aren’t simple CRUD operations; they are proposals to a distributed system that might take a moment to achieve consensus. The UI must reflect this pending state.

Here is a React component using MUI for a “Promotion” workflow. It locks the target environment and then promotes the build in a single conceptual transaction, which is actually two separate Paxos-agreed commands.

// src/components/PromotionWorkflow.js
import React, { useState, useEffect } from 'react';
import {
  Button,
  Box,
  Stepper,
  Step,
  StepLabel,
  Card,
  CardContent,
  Typography,
  CircularProgress,
  Alert,
} from '@mui/material';

const steps = ['Lock Staging Environment', 'Promote Build', 'Unlock Environment'];

// A mock API client for our control plane
const apiClient = {
  postCommand: async (command) => {
    const response = await fetch('/api/command', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(command),
    });
    if (!response.ok) {
      const errorText = await response.text();
      throw new Error(`Command failed: ${errorText}`);
    }
    return response.json();
  },
};

export default function PromotionWorkflow({ buildId }) {
  const [activeStep, setActiveStep] = useState(0);
  const [isProcessing, setIsProcessing] = useState(false);
  const [error, setError] = useState(null);
  const [isComplete, setIsComplete] = useState(false);

  const handlePromote = async () => {
    setIsProcessing(true);
    setError(null);
    setActiveStep(0);
    
    try {
      // Step 1: Lock Environment
      // This is a transactional command submitted to the Paxos cluster.
      console.log(`Proposing LOCK_ENVIRONMENT for build ${buildId}`);
      await apiClient.postCommand({
        type: 'LOCK_ENVIRONMENT',
        payload: { environment: 'staging', build_id: buildId },
      });
      setActiveStep(1);

      // A common mistake is to not add delays or feedback. In a real system,
      // the promotion itself (e.g., a script execution) takes time.
      await new Promise(res => setTimeout(res, 1000));

      // Step 2: Promote Build
      console.log(`Proposing PROMOTE_BUILD for build ${buildId}`);
      await apiClient.postCommand({
        type: 'PROMOTE_BUILD',
        payload: { build_id: buildId, target_env: 'staging' },
      });
      setActiveStep(2);
      
      await new Promise(res => setTimeout(res, 1000));

      // Step 3: Unlock Environment
      // This cleanup step is also transactional. If it fails, the lock remains,
      // preventing other builds from interfering, which is the desired safe state.
      console.log('Proposing UNLOCK_ENVIRONMENT');
       await apiClient.postCommand({
        type: 'UNLOCK_ENVIRONMENT',
        payload: { environment: 'staging' },
      });
      setActiveStep(3);
      setIsComplete(true);

    } catch (err) {
      console.error("Workflow failed:", err);
      // The error from the API reflects the state machine's rejection, e.g., "environment already locked".
      setError(err.message);
    } finally {
      setIsProcessing(false);
    }
  };

  return (
    <Card variant="outlined">
      <CardContent>
        <Typography variant="h6" gutterBottom>
          Promote Build: {buildId}
        </Typography>
        <Box sx={{ width: '100%', my: 3 }}>
          <Stepper activeStep={activeStep}>
            {steps.map((label) => (
              <Step key={label}>
                <StepLabel>{label}</StepLabel>
              </Step>
            ))}
          </Stepper>
        </Box>
        {error && <Alert severity="error" sx={{ mb: 2 }}>{error}</Alert>}
        {isComplete && <Alert severity="success" sx={{ mb: 2 }}>Promotion successful!</Alert>}
        
        <Box sx={{ display: 'flex', justifyContent: 'flex-end' }}>
          <Button
            variant="contained"
            onClick={handlePromote}
            disabled={isProcessing || isComplete}
          >
            {isProcessing ? <CircularProgress size={24} /> : 'Start Promotion'}
          </Button>
        </Box>
      </CardContent>
    </Card>
  );
}

The flow demonstrates how multiple distinct user actions are translated into a sequence of atomic state transitions on the distributed backend. The MUI Stepper and Alert components are critical for communicating the status of these long-running transactional operations to the user.

sequenceDiagram
    participant User
    participant MUI_Frontend as React (MUI)
    participant API_Gateway as API Gateway
    participant Paxos_Cluster as Paxos Cluster (3 Nodes)
    participant State_Machine as State Machine (on each Node)

    User->>+MUI_Frontend: Clicks "Start Promotion"
    MUI_Frontend->>API_Gateway: POST /command (LOCK_ENVIRONMENT)
    Note over MUI_Frontend: UI shows Step 1 as active, disables button.
    API_Gateway->>Paxos_Cluster: Propose(LOCK_ENVIRONMENT)
    Paxos_Cluster->>Paxos_Cluster: Run 2-Phase Paxos Protocol
    Note over Paxos_Cluster: Consensus reached
    Paxos_Cluster->>State_Machine: commitChan <- LOCK_CMD
    State_Machine->>State_Machine: Apply(LOCK_CMD) -> State updated
    Paxos_Cluster-->>API_Gateway: { status: 'committed' }
    API_Gateway-->>MUI_Frontend: 200 OK
    
    MUI_Frontend->>API_Gateway: POST /command (PROMOTE_BUILD)
    Note over MUI_Frontend: UI advances Stepper to Step 2.
    API_Gateway->>Paxos_Cluster: Propose(PROMOTE_BUILD)
    Paxos_Cluster->>Paxos_Cluster: Run 2-Phase Paxos Protocol
    Note over Paxos_Cluster: Consensus reached
    Paxos_Cluster->>State_Machine: commitChan <- PROMOTE_CMD
    State_Machine->>State_Machine: Apply(PROMOTE_CMD) -> State updated
    Paxos_Cluster-->>API_Gateway: { status: 'committed' }
    API_Gateway-->>MUI_Frontend: 200 OK

    MUI_Frontend->>API_Gateway: POST /command (UNLOCK_ENVIRONMENT)
    Note over MUI_Frontend: UI advances Stepper to Step 3.
    API_Gateway->>...: (Repeats consensus process)
    API_Gateway-->>MUI_Frontend: 200 OK
    MUI_Frontend-->>-User: Show "Promotion successful!" Alert

This architecture provides a durable, auditable, and fault-tolerant control plane for our mobile CI/CD pipeline. The state of a release is no longer an ephemeral property of a Jenkins runner but a transactionally consistent fact replicated across a cluster.

The path we took is not without significant trade-offs. Building and maintaining a custom consensus implementation, even a simplified one, carries a high operational burden. It lacks features like log compaction, which means our log of commands would grow indefinitely without further engineering. Furthermore, the performance is tailored for low-throughput, high-value operations initiated by humans, not for high-frequency machine-driven transactions. For many organizations, using a managed distributed database like CockroachDB, TiDB, or a service like AWS QLDB would be a far more pragmatic solution. Our choice was driven by a need for an embedded, dependency-free state management layer within our application and the invaluable deep understanding gained from building the consistency mechanism from the ground up.


  TOC