Implementing an Atomic Publish Flow in a Version-Controlled Low-Code Platform


The initial “publish” feature for our internal low-code platform was deceptively simple: a button that pushed a new UI configuration from staging to production. The implementation, however, led to an immediate and critical failure. The process involved updating two distinct microservices: UISchemaService, which stored the JSON definition of the user interface, and DataModelService, which managed the corresponding backend data schema. A network glitch after the UI schema was saved but before the data model was updated left a production application in a corrupted, inconsistent state. The frontend was referencing data fields that no longer existed in the backend schema, crashing the application for all users. This incident forced a complete re-architecture of the publish mechanism, demanding atomicity across service boundaries.

Our first decision was to formalize the UI definitions as versioned artifacts. Treating them as code in Git felt clumsy; they were structured data, not text. We opted for an unconventional tool: Data Version Control (DVC). Typically used for ML models, DVC’s ability to version large files and data, track them with lightweight metafiles in Git, and tag specific versions made it a surprisingly good fit for managing our JSON UI schemas. Each “publish” would create a new, immutable, versioned artifact tracked by DVC.

The core problem of atomicity remained. Standard database transactions were confined to a single service. We needed a distributed transaction protocol. After evaluating the Saga pattern, we chose Two-Phase Commit (2PC) for this specific workflow. While 2PC is a blocking protocol and has its well-documented drawbacks (particularly the vulnerability of its coordinator), the “publish” action is a low-frequency, high-stakes manual operation. The guarantee of immediate, strong consistency was deemed more critical than the high availability that Sagas provide through eventual consistency and compensation logic. A failed publish that locks resources for a few minutes until manual intervention was an acceptable trade-off for preventing a corrupted production state.

The final architecture coalesced around a central TransactionCoordinator service. A user action on the frontend, built with Headless UI components and powered by a Relay mutation, would trigger a request to the coordinator. The coordinator would then orchestrate the 2PC protocol with the UISchemaService and DataModelService acting as participants. The entire flow had to be rigorously tested, which brought Jest into the picture, not for simple unit tests, but for complex integration tests mocking service failures to validate the coordinator’s rollback and recovery logic.

sequenceDiagram
    participant FE as React Frontend (Relay)
    participant GW as API Gateway
    participant TC as TransactionCoordinator
    participant US as UISchemaService (Participant)
    participant DS as DataModelService (Participant)
    participant DVC as DVC Repository

    FE->>GW: POST /graphql (PublishAppVersionMutation)
    GW->>TC: POST /publish
    TC->>US: POST /prepare (Transaction ID, Schema)
    Note over US: Validate schema, lock resource, create temporary DVC version
    US->>TC: 200 OK (VOTE_COMMIT)
    TC->>DS: POST /prepare (Transaction ID, Data Model)
    Note over DS: Validate model, lock resource
    DS->>TC: 200 OK (VOTE_COMMIT)
    Note over TC: All participants voted to commit. Move to Phase 2.
    TC->>US: POST /commit (Transaction ID)
    US->>DVC: dvc add schemas.json && dvc push
    US->>DVC: git commit -m "..." && git tag v1.2.0
    US->>TC: 200 OK (COMMIT_SUCCESS)
    TC->>DS: POST /commit (Transaction ID)
    Note over DS: Apply schema changes to DB
    DS->>TC: 200 OK (COMMIT_SUCCESS)
    TC->>GW: 200 OK (Publish Successful)
    GW->>FE: GraphQL Response

Implementing the Two-Phase Commit Coordinator

The TransactionCoordinator is the brain of the operation. It’s a stateless Node.js service, though in a production environment, its transaction state would need to be persisted to a durable store like Redis or a database to handle coordinator crashes. For this implementation, we keep the state in memory.

The coordinator manages the transaction lifecycle and communicates with participants. We define a simple state machine for each transaction.

services/coordinator/transaction-manager.js:

const { v4: uuidv4 } = require('uuid');
const axios = require('axios');
const logger = require('../common/logger');

// In-memory store for transaction states. In production, this MUST be a persistent store (e.g., Redis, DB).
const transactions = new Map();

const TRANSACTION_STATE = {
    INITIAL: 'INITIAL',
    PREPARING: 'PREPARING',
    PREPARED: 'PREPARED',
    COMMITTING: 'COMMITTING',
    ABORTING: 'ABORTING',
    COMMITTED: 'COMMITTED',
    ABORTED: 'ABORTED',
};

const PARTICIPANT_VOTE = {
    COMMIT: 'VOTE_COMMIT',
    ABORT: 'VOTE_ABORT',
};

// Participant configuration
const PARTICIPANTS = [
    process.env.UI_SCHEMA_SERVICE_URL || 'http://localhost:3001',
    process.env.DATA_MODEL_SERVICE_URL || 'http://localhost:3002',
];

async function beginTransaction(payload) {
    const transactionId = uuidv4();
    const transaction = {
        id: transactionId,
        state: TRANSACTION_STATE.INITIAL,
        payload,
        participants: PARTICIPANTS.map(url => ({ url, vote: null, acknowledged: false })),
        startTime: Date.now(),
    };
    transactions.set(transactionId, transaction);
    logger.info(`[TXN:${transactionId}] Transaction started.`);
    
    // Start Phase 1: Prepare
    await executePreparePhase(transaction);
    
    // Decide on Phase 2
    if (transaction.state === TRANSACTION_STATE.PREPARED) {
        await executeCommitPhase(transaction);
    } else {
        // If state is not PREPARED, it must have been set to ABORTING by executePreparePhase
        await executeAbortPhase(transaction);
    }

    // Clean up transaction after a timeout to prevent memory leak
    setTimeout(() => transactions.delete(transactionId), 60000);

    if (transaction.state === TRANSACTION_STATE.COMMITTED) {
        return { success: true, transactionId };
    } else {
        throw new Error(`Transaction ${transactionId} failed and was aborted.`);
    }
}

async function executePreparePhase(transaction) {
    transaction.state = TRANSACTION_STATE.PREPARING;
    logger.info(`[TXN:${transaction.id}] Entering PREPARE phase.`);

    const preparePromises = transaction.participants.map(async (participant) => {
        try {
            const response = await axios.post(`${participant.url}/prepare`, {
                transactionId: transaction.id,
                payload: transaction.payload,
            }, { timeout: 5000 }); // Use a reasonable timeout

            if (response.data.vote === PARTICIPANT_VOTE.COMMIT) {
                participant.vote = PARTICIPANT_VOTE.COMMIT;
                logger.info(`[TXN:${transaction.id}] Participant ${participant.url} voted COMMIT.`);
            } else {
                participant.vote = PARTICIPANT_VOTE.ABORT;
                logger.warn(`[TXN:${transaction.id}] Participant ${participant.url} voted ABORT.`);
            }
        } catch (error) {
            participant.vote = PARTICIPANT_VOTE.ABORT;
            logger.error(`[TXN:${transaction.id}] Participant ${participant.url} failed to prepare: ${error.message}`);
        }
    });

    await Promise.all(preparePromises);

    const allVotedCommit = transaction.participants.every(p => p.vote === PARTICIPANT_VOTE.COMMIT);

    if (allVotedCommit) {
        transaction.state = TRANSACTION_STATE.PREPARED;
        logger.info(`[TXN:${transaction.id}] All participants prepared successfully. Ready to commit.`);
    } else {
        transaction.state = TRANSACTION_STATE.ABORTING;
        logger.warn(`[TXN:${transaction.id}] Not all participants voted to commit. Initiating abort.`);
    }
}

async function executeCommitPhase(transaction) {
    transaction.state = TRANSACTION_STATE.COMMITTING;
    logger.info(`[TXN:${transaction.id}] Entering COMMIT phase.`);

    const commitPromises = transaction.participants.map(async (participant) => {
        try {
            // A crucial aspect of 2PC is retrying the commit/abort calls until acknowledged.
            // A simple loop here demonstrates the principle. Production systems need more robust retry logic (e.g., exponential backoff).
            await axios.post(`${participant.url}/commit`, { transactionId: transaction.id });
            participant.acknowledged = true;
            logger.info(`[TXN:${transaction.id}] Participant ${participant.url} acknowledged COMMIT.`);
        } catch (error) {
            // If a commit fails, the coordinator MUST keep retrying. 
            // The participant has already agreed to commit and locked resources. It cannot unilaterally abort.
            logger.error(`[TXN:${transaction.id}] Participant ${participant.url} failed to acknowledge commit. System requires manual intervention or a robust retry mechanism. Error: ${error.message}`);
            // In a real system, this would trigger a high-priority alert.
        }
    });

    await Promise.all(commitPromises);
    
    // We assume for this example that retries would eventually succeed.
    transaction.state = TRANSACTION_STATE.COMMITTED;
    logger.info(`[TXN:${transaction.id}] Transaction committed successfully.`);
}

async function executeAbortPhase(transaction) {
    transaction.state = TRANSACTION_STATE.ABORTING;
    logger.warn(`[TXN:${transaction.id}] Entering ABORT phase.`);

    const abortPromises = transaction.participants.map(async (participant) => {
        try {
            await axios.post(`${participant.url}/abort`, { transactionId: transaction.id });
            participant.acknowledged = true;
            logger.info(`[TXN:${transaction.id}] Participant ${participant.url} acknowledged ABORT.`);
        } catch (error) {
            logger.error(`[TXN:${transaction.id}] Participant ${participant.url} failed to acknowledge abort. Retrying needed. Error: ${error.message}`);
            // Similar to commit, abort must be retried until success.
        }
    });

    await Promise.all(abortPromises);
    transaction.state = TRANSACTION_STATE.ABORTED;
    logger.info(`[TXN:${transaction.id}] Transaction aborted.`);
}

module.exports = { beginTransaction };

Participant Implementation: The UISchemaService with DVC

The UISchemaService is responsible for managing the UI JSON files. Its role as a participant is critical. During the prepare phase, it must do all the work short of making the change final. This includes validating the schema and creating a new DVC-tracked version, but without updating the “live” or “latest” pointer/tag. The commit phase is then a very lightweight operation: simply updating the pointer.

services/ui-schema-service/server.js:

const express = require('express');
const bodyParser = require('body-parser');
const fs = require('fs/promises');
const { exec } = require('child_process');
const path = require('path');
const logger = require('../common/logger');

const app = express();
app.use(bodyParser.json());

// In-memory store for prepared transactions. In production, use a persistent store.
const preparedTransactions = new Map();
const DVC_REPO_PATH = path.join(__dirname, 'dvc_repo');
const SCHEMA_FILE_PATH = path.join(DVC_REPO_PATH, 'schemas.json');

// Helper for running shell commands
const runCommand = (command, cwd) => {
    return new Promise((resolve, reject) => {
        exec(command, { cwd }, (error, stdout, stderr) => {
            if (error) {
                logger.error(`Exec error for command "${command}": ${stderr}`);
                return reject(new Error(stderr));
            }
            logger.info(`Exec success for command "${command}": ${stdout}`);
            resolve(stdout);
        });
    });
};

app.post('/prepare', async (req, res) => {
    const { transactionId, payload } = req.body;
    logger.info(`[TXN:${transactionId}] Received PREPARE request.`);

    try {
        // 1. Validate the payload (omitted for brevity)
        if (!payload.uiSchema || !payload.versionTag) {
            throw new Error('Invalid payload for UI schema service.');
        }

        // 2. Lock the resource. A simple file-based lock for this example.
        // In a real multi-instance service, a distributed lock (e.g., Redis SETNX) is required.
        await fs.writeFile(path.join(DVC_REPO_PATH, '.lock'), transactionId);
        
        // 3. Write new schema to a temporary file to not disturb the main file yet
        const tempSchemaPath = `${SCHEMA_FILE_PATH}.${transactionId}`;
        await fs.writeFile(tempSchemaPath, JSON.stringify(payload.uiSchema, null, 2));

        // 4. Stage the change with DVC but don't commit the git tag yet
        await runCommand(`mv ${tempSchemaPath} ${SCHEMA_FILE_PATH}`, DVC_REPO_PATH);
        await runCommand('dvc add schemas.json', DVC_REPO_PATH);

        preparedTransactions.set(transactionId, { versionTag: payload.versionTag });
        logger.info(`[TXN:${transactionId}] Resource locked and DVC artifact prepared. Voting COMMIT.`);
        res.status(200).json({ vote: 'VOTE_COMMIT' });
    } catch (error) {
        logger.error(`[TXN:${transactionId}] Prepare phase failed: ${error.message}`);
        await runCommand('dvc checkout schemas.json', DVC_REPO_PATH); // Revert DVC changes
        await fs.unlink(path.join(DVC_REPO_PATH, '.lock')).catch(() => {}); // Unlock
        res.status(500).json({ vote: 'VOTE_ABORT', reason: error.message });
    }
});

app.post('/commit', async (req, res) => {
    const { transactionId } = req.body;
    logger.info(`[TXN:${transactionId}] Received COMMIT request.`);
    
    if (!preparedTransactions.has(transactionId)) {
        // This can happen if the coordinator retries a commit for an already committed transaction.
        // The operation must be idempotent.
        logger.warn(`[TXN:${transactionId}] Commit requested for an unknown or already committed transaction. Acknowledging anyway.`);
        return res.status(200).send();
    }
    
    const { versionTag } = preparedTransactions.get(transactionId);

    try {
        // The actual "commit" is making the DVC change permanent with a git tag.
        await runCommand('git add schemas.json.dvc', DVC_REPO_PATH);
        await runCommand(`git commit -m "Publish version ${versionTag}"`, DVC_REPO_PATH);
        await runCommand(`git tag ${versionTag}`, DVC_REPO_PATH);
        // In a real scenario, you'd also push to your DVC remote and git remote.
        // await runCommand('dvc push', DVC_REPO_PATH);
        // await runCommand('git push --tags', DVC_REPO_PATH);

        logger.info(`[TXN:${transactionId}] Successfully committed and tagged version ${versionTag}.`);
        preparedTransactions.delete(transactionId);
        await fs.unlink(path.join(DVC_REPO_PATH, '.lock'));
        res.status(200).send();
    } catch (error) {
        logger.error(`[TXN:${transactionId}] CRITICAL: Commit phase failed: ${error.message}. Manual intervention required.`);
        // The service is now in an inconsistent state. The coordinator will keep retrying.
        res.status(500).send({ error: "Commit failed, awaiting retry" });
    }
});

app.post('/abort', async (req, res) => {
    const { transactionId } = req.body;
    logger.warn(`[TXN:${transactionId}] Received ABORT request.`);
    if (!preparedTransactions.has(transactionId)) {
        logger.warn(`[TXN:${transactionId}] Abort requested for an unknown or already handled transaction.`);
        return res.status(200).send();
    }
    
    try {
        // Revert any staged changes
        await runCommand('dvc checkout schemas.json', DVC_REPO_PATH);
        await runCommand('git checkout schemas.json.dvc', DVC_REPO_PATH); // Also revert the .dvc file
        
        preparedTransactions.delete(transactionId);
        await fs.unlink(path.join(DVC_REPO_PATH, '.lock'));
        logger.warn(`[TXN:${transactionId}] Abort successful. Changes reverted.`);
        res.status(200).send();
    } catch (error) {
        logger.error(`[TXN:${transactionId}] CRITICAL: Abort phase failed: ${error.message}. Manual intervention required.`);
        res.status(500).send({ error: "Abort failed, awaiting retry" });
    }
});

// Setup DVC repo if it doesn't exist (for demo purposes)
async function initDvcRepo() {
    if (!fs.existsSync(DVC_REPO_PATH)) {
        await fs.mkdir(DVC_REPO_PATH, { recursive: true });
        await runCommand('git init', DVC_REPO_PATH);
        await runCommand('dvc init', DVC_REPO_PATH);
        await fs.writeFile(SCHEMA_FILE_PATH, '{}');
        await runCommand('dvc add schemas.json', DVC_REPO_PATH);
        await runCommand('git add .', DVC_REPO_PATH);
        await runCommand('git commit -m "Initial commit"', DVC_REPO_PATH);
        logger.info('DVC repository initialized.');
    }
}

initDvcRepo().then(() => {
    app.listen(3001, () => logger.info('UI Schema Service listening on port 3001'));
});

The DataModelService would have a similar structure, locking database rows during prepare and executing the ALTER TABLE or equivalent schema migration during commit.

Frontend Integration with Relay and Headless UI

The frontend needs to present a clean interface for this complex operation. We use Headless UI’s Dialog component for the confirmation modal and Relay’s useMutation hook to trigger the publish flow.

components/PublishButton.js:

import React, { useState } from 'react';
import { Dialog } from '@headlessui/react';
import { graphql, useMutation } from 'react-relay';

const PublishAppVersionMutation = graphql`
    mutation PublishButtonPublishAppVersionMutation($input: PublishAppVersionInput!) {
        publishAppVersion(input: $input) {
            success
            newVersionTag
            errors {
                message
            }
        }
    }
`;

export function PublishButton({ uiSchema, dataModel, nextVersion }) {
    const [isOpen, setIsOpen] = useState(false);
    const [commit, isInFlight] = useMutation(PublishAppVersionMutation);
    const [serverError, setServerError] = useState(null);

    const handlePublish = () => {
        setServerError(null);
        commit({
            variables: {
                input: {
                    uiSchema: JSON.stringify(uiSchema),
                    dataModel: JSON.stringify(dataModel),
                    versionTag: nextVersion,
                },
            },
            onCompleted: (response, errors) => {
                if (errors) {
                    setServerError(errors[0].message);
                    return;
                }
                if (response.publishAppVersion?.errors) {
                     setServerError(response.publishAppVersion.errors[0].message);
                } else {
                    setIsOpen(false);
                    // Trigger a refetch of the application list or show success toast
                }
            },
            onError: (error) => {
                setServerError(error.message || 'An unexpected error occurred.');
            },
        });
    };

    return (
        <>
            <button onClick={() => setIsOpen(true)}>Publish New Version</button>
            <Dialog open={isOpen} onClose={() => setIsOpen(false)}>
                <div className="fixed inset-0 bg-black/30" aria-hidden="true" />
                <div className="fixed inset-0 flex items-center justify-center p-4">
                    <Dialog.Panel className="w-full max-w-sm rounded bg-white p-6">
                        <Dialog.Title>Confirm Publication</Dialog.Title>
                        <Dialog.Description>
                            This will publish version {nextVersion} to production. This action cannot be undone directly, but you can roll back later.
                        </Dialog.Description>

                        {serverError && <p className="text-red-500">{serverError}</p>}

                        <div className="mt-4 flex gap-4">
                            <button onClick={handlePublish} disabled={isInFlight}>
                                {isInFlight ? 'Publishing...' : 'Confirm Publish'}
                            </button>
                            <button onClick={() => setIsOpen(false)} disabled={isInFlight}>Cancel</button>
                        </div>
                    </Dialog.Panel>
                </div>
            </Dialog>
        </>
    );
}

The GraphQL server resolver for publishAppVersion would simply call the beginTransaction function in the TransactionCoordinator.

Testing the Distributed Flow with Jest

Testing this system is non-trivial. We cannot rely on unit tests alone. We need an integration test for the coordinator that simulates the behavior of the participants, including various failure modes. Jest’s mocking is perfect for this.

services/coordinator/transaction-manager.test.js:

const axios = require('axios');
const { beginTransaction } = require('./transaction-manager');
const logger = require('../common/logger');

// Mock the logger to suppress output during tests
jest.mock('../common/logger', () => ({
    info: jest.fn(),
    warn: jest.fn(),
    error: jest.fn(),
}));

// Mock axios to simulate participant responses
jest.mock('axios');

const MOCK_PAYLOAD = { uiSchema: { id: "test" }, versionTag: "v1.0.0" };
const UI_SCHEMA_SERVICE_URL = 'http://localhost:3001';
const DATA_MODEL_SERVICE_URL = 'http://localhost:3002';

describe('TransactionManager', () => {
    beforeEach(() => {
        // Reset mocks before each test
        axios.post.mockReset();
        logger.info.mockClear();
        logger.warn.mockClear();
        logger.error.mockClear();
    });
    
    it('should successfully commit a transaction when all participants vote to commit', async () => {
        axios.post.mockImplementation((url, data) => {
            if (url.endsWith('/prepare')) {
                return Promise.resolve({ data: { vote: 'VOTE_COMMIT' } });
            }
            if (url.endsWith('/commit')) {
                return Promise.resolve({ status: 200 });
            }
            return Promise.reject(new Error('Unexpected URL'));
        });
        
        const result = await beginTransaction(MOCK_PAYLOAD);
        expect(result.success).toBe(true);
        
        // Verify prepare calls
        expect(axios.post).toHaveBeenCalledWith(`${UI_SCHEMA_SERVICE_URL}/prepare`, expect.any(Object), expect.any(Object));
        expect(axios.post).toHaveBeenCalledWith(`${DATA_MODEL_SERVICE_URL}/prepare`, expect.any(Object), expect.any(Object));

        // Verify commit calls
        expect(axios.post).toHaveBeenCalledWith(`${UI_SCHEMA_SERVICE_URL}/commit`, expect.any(Object));
        expect(axios.post).toHaveBeenCalledWith(`${DATA_MODEL_SERVICE_URL}/commit`, expect.any(Object));
        
        expect(logger.info).toHaveBeenCalledWith(expect.stringContaining('Transaction committed successfully.'));
    });
    
    it('should abort a transaction if one participant votes to abort', async () => {
        axios.post.mockImplementation((url, data) => {
            if (url === `${UI_SCHEMA_SERVICE_URL}/prepare`) {
                return Promise.resolve({ data: { vote: 'VOTE_COMMIT' } });
            }
            if (url === `${DATA_MODEL_SERVICE_URL}/prepare`) {
                // This participant fails the prepare phase
                return Promise.resolve({ data: { vote: 'VOTE_ABORT' } });
            }
            if (url.endsWith('/abort')) {
                return Promise.resolve({ status: 200 });
            }
            return Promise.reject(new Error(`Unexpected call to ${url}`));
        });
        
        await expect(beginTransaction(MOCK_PAYLOAD)).rejects.toThrow(/failed and was aborted/);

        // Verify abort calls
        expect(axios.post).toHaveBeenCalledWith(`${UI_SCHEMA_SERVICE_URL}/abort`, expect.any(Object));
        expect(axios.post).toHaveBeenCalledWith(`${DATA_MODEL_SERVICE_URL}/abort`, expect.any(Object));
        
        // Ensure commit was never called
        expect(axios.post).not.toHaveBeenCalledWith(expect.stringContaining('/commit'), expect.any(Object));
        
        expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Initiating abort.'));
        expect(logger.info).toHaveBeenCalledWith(expect.stringContaining('Transaction aborted.'));
    });
    
    it('should abort a transaction if a participant times out during prepare', async () => {
        axios.post.mockImplementation((url) => {
            if (url === `${UI_SCHEMA_SERVICE_URL}/prepare`) {
                return Promise.resolve({ data: { vote: 'VOTE_COMMIT' } });
            }
            if (url === `${DATA_MODEL_SERVICE_URL}/prepare`) {
                // Simulate a network error/timeout
                return Promise.reject(new Error('Network timeout'));
            }
            if (url.endsWith('/abort')) {
                return Promise.resolve({ status: 200 });
            }
            return Promise.reject(new Error(`Unexpected call to ${url}`));
        });
        
        await expect(beginTransaction(MOCK_PAYLOAD)).rejects.toThrow(/failed and was aborted/);

        expect(axios.post).toHaveBeenCalledWith(`${UI_SCHEMA_SERVICE_URL}/abort`, expect.any(Object));
        expect(axios.post).toHaveBeenCalledWith(`${DATA_MODEL_SERVICE_URL}/abort`, expect.any(Object));

        expect(logger.error).toHaveBeenCalledWith(expect.stringContaining('failed to prepare: Network timeout'));
    });
});

This test suite gives us confidence that the coordinator behaves correctly under both ideal conditions and common failure scenarios, which is essential for a critical component responsible for maintaining data consistency.

The choice of Two-Phase Commit introduced significant complexity into our system, requiring careful implementation of the coordinator and participant logic, as well as thorough testing of failure modes. The coordinator’s in-memory state management is a known liability; a production-grade implementation must persist transaction states to a reliable data store (like etcd, ZooKeeper, or even a transactional database) and feature a recovery process on startup to handle crashes. Furthermore, the synchronous, blocking nature of 2PC means that a slow participant can degrade the performance of the entire “publish” operation. For our specific use case—an infrequent, user-triggered action where atomicity is paramount—this trade-off was acceptable, but it severely limits the protocol’s applicability for high-throughput, latency-sensitive workflows where a Saga pattern would be a more appropriate, albeit more complex to debug, architectural choice.


  TOC