Implementing Deterministic E2E Tests for Asynchronous Ktor Sagas with a Cypress Backchannel


Our CI pipeline was hemorrhaging. Flaky end-to-end tests for a new travel booking module were bringing builds to a halt multiple times a day. The root cause was a fundamental mismatch: our Cypress tests, written with a synchronous mindset, were trying to validate a distributed, asynchronous process orchestrated by a Saga pattern in our Ktor backend. The usual suspect, cy.wait(15000), was less of a solution and more of a prayer that rarely got answered. It was clear we weren’t just fixing a test; we were solving an architectural challenge in observability for testing.

The system itself is straightforward in concept. A user books a trip package: flight, hotel, and car rental. In a monolith, this would be a single database transaction. In our microservices world, it’s a Saga. Each booking is a separate service call. If the hotel booking fails, the flight that was already booked must be compensated (canceled). This eventual consistency is efficient for production but a nightmare for automated verification. A Cypress test would click “Book Trip,” and the UI might show a “Processing…” spinner. What happens next from the test’s perspective is a black box. Did it succeed? Did it fail and correctly compensate? Relying on arbitrary waits and then checking the UI for a final success or failure message resulted in tests that would pass three times and then fail for five, purely due to network latency or service load variations.

Our first attempt was to make the test smarter by directly polling the database.

// An example of a brittle, anti-pattern test
it('should book a trip and poll the database for confirmation', () => {
  // Assume a transaction ID is obtained after the initial click
  const transactionId = 'tx-abc-123';
  
  cy.get('[data-cy="book-trip-button"]').click();
  
  // Custom task to query the database directly. THIS IS A BAD IDEA.
  const checkDb = (retries = 0) => {
    cy.task('queryDatabase', `SELECT status FROM bookings WHERE id = '${transactionId}'`)
      .then(result => {
        if (result && result.status === 'CONFIRMED') {
          // Found it, continue the test
          cy.get('[data-cy="booking-success-message"]').should('be.visible');
          return;
        }

        if (retries > 10) {
          throw new Error('Booking never confirmed in database');
        }

        // Wait and retry
        cy.wait(1500);
        checkDb(retries + 1);
      });
  };
  
  checkDb();
});

This approach is fraught with problems. It tightly couples the E2E test suite to the database schema of a specific service. If a column is renamed, the test breaks. It requires giving the Cypress test runner database credentials, a significant security concern. Most importantly, it only tells us the final state. It provides no visibility into the intermediate steps of the Saga. We couldn’t use it to verify that a compensation was correctly triggered after a deliberate failure injection. A better solution was needed—one that respected service boundaries but provided the necessary observability.

The chosen path was to build a dedicated, test-only “Saga State Inspection API” within our Ktor application. This backchannel would allow Cypress to query the real-time status of any given Saga transaction without touching the database directly or knowing implementation details of downstream services. It’s an explicit acknowledgment that for a complex system to be testable, it sometimes needs to expose seams that are only available in a testing context.

The Ktor Saga Orchestrator Foundation

Before building the backchannel, we need a Saga to test. Our Ktor application orchestrates the booking process. For simplicity in this context, the orchestrator will be in-memory, and the service calls will be simulated with delays and programmable failures. In a real-world project, this state would live in a durable store like Redis or a database table.

The core components are the SagaState and the SagaOrchestrator.

// build.gradle.kts dependencies
dependencies {
    implementation("io.ktor:ktor-server-core-jvm:$ktor_version")
    implementation("io.ktor:ktor-server-netty-jvm:$ktor_version")
    implementation("io.ktor:ktor-server-content-negotiation:$ktor_version")
    implementation("io.ktor:ktor-serialization-kotlinx-json-jvm:$ktor_version")
    implementation("ch.qos.logback:logback-classic:$logback_version")
    // For coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
}

First, let’s define the states and the data structure for tracking a Saga’s lifecycle.

package com.example.saga

import kotlinx.serialization.Serializable
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

// Represents each step in the Saga
@Serializable
data class SagaStep(val name: String, val status: StepStatus, val timestamp: Long = System.currentTimeMillis())

@Serializable
enum class StepStatus { PENDING, IN_PROGRESS, COMPLETED, FAILED, COMPENSATING, COMPENSATED }

// Represents the entire lifecycle of one transaction
@Serializable
data class SagaState(
    val transactionId: String,
    val overallStatus: StepStatus,
    val history: List<SagaStep>
)

// In-memory store for all active and completed Sagas.
// In a production system, this would be backed by Redis, a DB, or an event log.
object SagaRepository {
    private val sagas = ConcurrentHashMap<String, SagaState>()

    fun findById(transactionId: String): SagaState? = sagas[transactionId]

    fun save(state: SagaState) {
        sagas[transactionId] = state
    }

    fun create(initialSteps: List<String>): SagaState {
        val transactionId = "tx-${UUID.randomUUID()}"
        val history = initialSteps.map { SagaStep(it, StepStatus.PENDING) }
        val initialState = SagaState(transactionId, StepStatus.PENDING, history)
        sagas[transactionId] = initialState
        return initialState
    }
}

Now, the orchestrator itself. It runs in a separate coroutine scope and processes the steps. This simulates the asynchronous nature of calling different microservices.

package com.example.saga

import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import kotlin.random.Random

class SagaOrchestrator(
    private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    // Simulates calling external services like flight, hotel, etc.
    // Can be programmed to fail for testing purposes.
    private suspend fun executeServiceCall(serviceName: String, fail: Boolean): Boolean {
        logger.info("Executing service call for: $serviceName")
        delay(Random.nextLong(500, 2000)) // Simulate network latency
        if (fail) {
            logger.error("Service call FAILED for: $serviceName")
            return false
        }
        logger.info("Service call COMPLETED for: $serviceName")
        return true
    }

    private suspend fun executeCompensation(serviceName: String) {
        logger.warn("Executing compensation for: $serviceName")
        delay(Random.nextLong(200, 800))
        logger.warn("Compensation COMPLETED for: $serviceName")
    }

    fun startTripBookingSaga(failOnService: String? = null) {
        val saga = SagaRepository.create(listOf("Flight", "Hotel", "Car"))
        logger.info("Starting Saga for transactionId: ${saga.transactionId}")

        scope.launch {
            runSaga(saga.transactionId, failOnService)
        }
    }
    
    private suspend fun runSaga(transactionId: String, failOnService: String?) {
        val executedSteps = mutableListOf<String>()
        var sagaFailed = false
        
        val initialSaga = SagaRepository.findById(transactionId) ?: return
        
        // Forward execution steps
        for (stepName in initialSaga.history.map { it.name }) {
            updateStepStatus(transactionId, stepName, StepStatus.IN_PROGRESS)
            
            val success = executeServiceCall(stepName, fail = (stepName == failOnService))
            
            if (success) {
                updateStepStatus(transactionId, stepName, StepStatus.COMPLETED)
                executedSteps.add(stepName)
            } else {
                updateStepStatus(transactionId, stepName, StepStatus.FAILED)
                sagaFailed = true
                break // Stop processing forward steps
            }
        }
        
        // If failure, run compensations in reverse order
        if (sagaFailed) {
            updateOverallStatus(transactionId, StepStatus.COMPENSATING)
            for (stepToCompensate in executedSteps.reversed()) {
                 updateStepStatus(transactionId, stepToCompensate, StepStatus.COMPENSATING)
                 executeCompensation(stepToCompensate)
                 updateStepStatus(transactionId, stepToCompensate, StepStatus.COMPENSATED)
            }
            updateOverallStatus(transactionId, StepStatus.COMPENSATED)
            logger.error("Saga ${transactionId} finished with status: COMPENSATED")
        } else {
            updateOverallStatus(transactionId, StepStatus.COMPLETED)
            logger.info("Saga ${transactionId} finished with status: COMPLETED")
        }
    }

    // Helper functions to update state immutably
    private fun updateStepStatus(transactionId: String, stepName: String, newStatus: StepStatus) {
        SagaRepository.findById(transactionId)?.let { current ->
            val newHistory = current.history.map { 
                if (it.name == stepName) it.copy(status = newStatus) else it 
            }
            SagaRepository.save(current.copy(history = newHistory))
        }
    }

    private fun updateOverallStatus(transactionId: String, newStatus: StepStatus) {
        SagaRepository.findById(transactionId)?.let { current ->
            SagaRepository.save(current.copy(overallStatus = newStatus))
        }
    }
}

Building the Ktor Backchannel API

With the orchestration logic in place, exposing its state is straightforward. We’ll add a new route in our Ktor application: /test-support/saga-status/{id}. A crucial production-ready detail is to make this endpoint available only when the application is running in a “test” environment.

package com.example.plugins

import com.example.saga.SagaOrchestrator
import com.example.saga.SagaRepository
import io.ktor.server.application.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.request.*
import io.ktor.http.HttpStatusCode

fun Application.configureRouting() {
    val orchestrator = SagaOrchestrator()
    val isTestEnvironment = environment.config.propertyOrNull("ktor.environment")?.getString() == "test"

    routing {
        // --- Production API ---
        post("/book-trip") {
            // In a real app, this would come from the request body
            val params = call.receive<Map<String, String>>()
            val failOnService = params["failOnService"]

            val initialState = orchestrator.startTripBookingSaga(failOnService)
            call.respond(mapOf("transactionId" to initialState.transactionId))
        }

        // --- Test-Only Backchannel API ---
        if (isTestEnvironment) {
            get("/test-support/saga-status/{transactionId}") {
                val transactionId = call.parameters["transactionId"]
                if (transactionId == null) {
                    call.respond(HttpStatusCode.BadRequest, "Missing transactionId")
                    return@get
                }
                
                val state = SagaRepository.findById(transactionId)
                if (state != null) {
                    call.respond(state)
                } else {
                    call.respond(HttpStatusCode.NotFound, "Saga with id $transactionId not found")
                }
            }
        }
    }
}

// In application.conf, we would have:
// ktor {
//   deployment {
//     port = 8080
//   }
//   application {
//     modules = [ com.example.ApplicationKt.module ]
//   }
//   environment = "prod" // or "test"
// }

We start the Ktor application with environment=test for our E2E runs. This single change gates access to the entire test support infrastructure.

The Cypress Implementation: Custom Commands

Now for the client side. Cypress needs to be taught how to use this new backchannel. A naive implementation would be to litter cy.request calls throughout our tests. A far cleaner approach is to create a custom command, cy.waitForSagaCompletion. This encapsulates the polling logic, making the tests declarative and readable.

First, let’s set up the custom command in cypress/support/commands.js.

// cypress/support/commands.js

/**
 * Polls the test-only Saga status endpoint until the Saga reaches a terminal state.
 * @param {string} transactionId - The ID of the Saga to poll.
 * @param {object} options - Configuration for polling.
 * @param {string} options.expectedStatus - The target status ('COMPLETED' or 'COMPENSATED').
 * @param {number} options.timeout - Max time to wait in ms.
 * @param {number} options.interval - Time between polls in ms.
 */
Cypress.Commands.add('waitForSagaCompletion', (transactionId, options = {}) => {
  const {
    expectedStatus = 'COMPLETED',
    timeout = 20000,
    interval = 1000
  } = options;

  const endpoint = `/test-support/saga-status/${transactionId}`;
  
  const startTime = Date.now();

  const poll = () => {
    if (Date.now() - startTime > timeout) {
      throw new Error(`Saga ${transactionId} did not reach status ${expectedStatus} within ${timeout}ms.`);
    }
    
    cy.request({
      url: endpoint,
      failOnStatusCode: false // We handle the 404 case initially
    }).then(response => {
      // It might take a moment for the Saga to be created, so a 404 is a valid intermediate state.
      if (response.status === 404) {
        cy.wait(interval, { log: false }).then(poll);
        return;
      }
      
      // If we get another error code, fail fast.
      if (response.status !== 200) {
        throw new Error(`Failed to poll Saga status. Endpoint returned ${response.status}`);
      }
      
      const { overallStatus, history } = response.body;
      
      cy.log(`Saga [${transactionId}] current status: ${overallStatus}`);
      
      // Check for terminal states
      if (overallStatus === expectedStatus) {
        cy.log(`Saga reached expected terminal state: ${expectedStatus}`).then(() => {
          // Yield the final saga state for further assertions
          return cy.wrap(response.body);
        });
      } else if (overallStatus === 'COMPLETED' || overallStatus === 'COMPENSATED' || overallStatus === 'FAILED') {
        // It reached a *different* terminal state than expected. Fail the test.
        throw new Error(`Saga reached unexpected terminal state: ${overallStatus}. Expected: ${expectedStatus}. History: ${JSON.stringify(history)}`);
      } else {
        // It's still in progress, poll again.
        cy.wait(interval, { log: false }).then(poll);
      }
    });
  };

  poll();
});

This command is robust. It handles the case where the Saga hasn’t been created yet (404), fails fast on unexpected errors, and provides a clear timeout mechanism.

The Final, Deterministic E2E Tests

With the orchestrator, the backchannel, and the custom Cypress command in place, our tests become elegant and deterministic. We can now test both the happy path and, more importantly, the compensation path with confidence.

// cypress/e2e/booking.cy.js

describe('Trip Booking Saga', () => {

  it('should successfully book a trip when all services are available', () => {
    // Intercept the initial POST to get the transactionId
    cy.intercept('POST', '/book-trip').as('bookTrip');

    cy.visit('/'); // Assuming the UI is on the root
    cy.get('[data-cy="book-trip-button"]').click();
    
    // The UI can show a loading spinner here
    cy.get('[data-cy="booking-in-progress"]').should('be.visible');

    // Wait for the API call to complete and extract the transactionId
    cy.wait('@bookTrip').then((interception) => {
      const { transactionId } = interception.response.body;
      cy.log(`Transaction ID: ${transactionId}`);

      // This is the key. No more cy.wait(15000).
      // We wait deterministically for the saga to complete.
      cy.waitForSagaCompletion(transactionId, { expectedStatus: 'COMPLETED' })
        .then(finalSagaState => {
          // We can now make assertions on the final state of the system
          expect(finalSagaState.overallStatus).to.eq('COMPLETED');
          expect(finalSagaState.history.every(step => step.status === 'COMPLETED')).to.be.true;
        });
    });

    // Finally, assert the UI has updated
    cy.get('[data-cy="booking-success-message"]').should('be.visible');
    cy.get('[data-cy="booking-in-progress"]').should('not.exist');
  });

  it('should trigger compensation if the Hotel service fails', () => {
    cy.intercept('POST', '/book-trip').as('bookTrip');

    cy.visit('/');
    
    // Using a (hypothetical) debug UI element to inject a failure
    cy.get('[data-cy="fail-on-service-select"]').select('Hotel'); 
    cy.get('[data-cy="book-trip-button"]').click();

    cy.get('[data-cy="booking-in-progress"]').should('be.visible');

    cy.wait('@bookTrip').then((interception) => {
      const { transactionId } = interception.response.body;
      cy.log(`Transaction ID: ${transactionId}`);

      // Now we wait for the *COMPENSATED* state.
      cy.waitForSagaCompletion(transactionId, { expectedStatus: 'COMPENSATED', timeout: 25000 })
        .then(finalSagaState => {
          // Deep assertions on the Saga history
          expect(finalSagaState.overallStatus).to.eq('COMPENSATED');
          
          const flightStep = finalSagaState.history.find(s => s.name === 'Flight');
          const hotelStep = finalSagaState.history.find(s => s.name === 'Hotel');
          const carStep = finalSagaState.history.find(s => s.name === 'Car');

          // Verify the flow: Flight was booked, then compensated. Hotel failed. Car was never touched.
          expect(flightStep.status).to.eq('COMPENSATED');
          expect(hotelStep.status).to.eq('FAILED');
          expect(carStep.status).to.eq('PENDING'); // It never ran because the saga failed before it.
        });
    });

    // Assert the UI shows the correct failure message
    cy.get('[data-cy="booking-failed-message"]').should('contain.text', 'Hotel booking failed');
    cy.get('[data-cy="booking-in-progress"]').should('not.exist');
  });
});

The flow looks like this:

sequenceDiagram
    participant Cypress as Cypress Test Runner
    participant UI as Web Application UI
    participant Ktor as Ktor Backend
    participant Saga as Saga Orchestrator

    Cypress->>UI: Clicks 'Book Trip' button
    UI->>Ktor: POST /book-trip (failOn="Hotel")
    Ktor->>Saga: startTripBookingSaga()
    Saga-->>Ktor: Returns { transactionId: "tx-123" }
    Ktor-->>UI: Responds with transactionId
    Note over Cypress: Intercepts response, captures "tx-123"

    Cypress->>Ktor: Starts polling GET /test-support/saga-status/tx-123

    par
        Saga->>Saga: Step "Flight": IN_PROGRESS -> COMPLETED
    and
        Ktor-->>Cypress: Responds with { status: PENDING, history: [...] }
        Cypress->>Ktor: Continues polling...
    end

    par
        Saga->>Saga: Step "Hotel": IN_PROGRESS -> FAILED
    and
        Ktor-->>Cypress: Responds with { status: IN_PROGRESS, history: [Flight: COMPLETED] }
        Cypress->>Ktor: Continues polling...
    end

    par
        Saga->>Saga: Overall Status -> COMPENSATING
        Saga->>Saga: Step "Flight": COMPENSATING -> COMPENSATED
    and
        Ktor-->>Cypress: Responds with { status: FAILED, history: [..., Hotel: FAILED] }
        Cypress->>Ktor: Continues polling...
    end

    Saga->>Saga: Overall Status -> COMPENSATED
    Ktor-->>Cypress: Responds with { status: COMPENSATED, history: [...] }
    Note over Cypress: Polling successful. Expected state 'COMPENSATED' reached.

    Cypress->>UI: Asserts failure message is visible

This architecture transformed our CI from a source of frustration into a reliable gatekeeper. The tests are no longer flaky; they are deterministic, easier to read, and capable of verifying the complex compensation logic that is central to the Saga pattern’s resilience.

The solution isn’t without its own set of trade-offs. The introduction of a test-only API endpoint increases the application’s surface area, and rigorous discipline is required to ensure it is never enabled in a production environment. The current in-memory implementation of the SagaRepository is a simplification; a distributed Ktor application would require this state to be externalized to a shared store like Redis, which would add another layer of infrastructure to manage. A future iteration could explore using WebSockets or Server-Sent Events for the backchannel, pushing state updates to Cypress instead of relying on polling, which could marginally speed up test execution for very long-running Sagas.


  TOC