Implementing a Distributed Mobile Build Farm with Kafka Event Sourcing and Ray Actors


Our mobile CI infrastructure was buckling. What began as a single Jenkins master orchestrating a handful of Mac minis and Linux boxes had morphed into a bottleneck that throttled the entire mobile engineering department. Build queues would stretch for hours, particularly before a release deadline. A failure in the Jenkins master, or one of its critical plugins, meant a full stop to all mobile builds and deployments. The system was opaque, difficult to scale, and fragile. We had to replace the core execution engine with something distributed, resilient, and observable from the ground up.

The core concept was to decouple build triggering from build execution. We envisioned an event-driven system where every request—a git push, a pull request update, a nightly trigger—is published as a durable event. A fleet of stateless workers, forming a compute grid, would then consume these events and execute the builds in isolated environments. This architecture would allow us to scale the worker pool independently and provide resilience against individual node failures.

Technology selection was a critical phase. For the event bus, Kafka was the clear choice over alternatives like RabbitMQ. Its log-based persistence model gave us not just a queue, but an immutable audit trail of every build ever requested. The ability to replay the event stream was a powerful feature for debugging and recovery. For the distributed compute grid, we evaluated several options, including a custom solution built on Kubernetes Jobs and orchestrators like Celery. We settled on Ray. Its actor model provided a high-level, intuitive abstraction for managing distributed state and scheduling tasks, which was exactly what we needed for orchestrating build workers. A common mistake is to underestimate the complexity of building this orchestration logic manually; Ray provides it out of the box. For build environment isolation, Docker was the obvious and standard choice.

Here’s the high-level flow we designed. A webhook service translates Git events into structured BuildRequest messages and produces them to a Kafka topic. A central BuildDispatcher consumes these messages and uses a Ray actor, the BuildCoordinator, to schedule the work. The BuildCoordinator manages a pool of BuildWorker actors, dispatching tasks to available workers. Each BuildWorker then uses Docker to run the actual build script in a clean, containerized environment.

graph TD
    subgraph Git Service
        A[Git Push/PR] --> B{Webhook Handler};
    end

    subgraph Ingestion Layer
        B -->|Produces BuildRequest| C[Kafka Topic: build-requests];
    end

    subgraph Dispatcher Service
        D[BuildDispatcher] -- Consumes --> C;
    end

    subgraph Ray Cluster
        D -->|Schedules build via Actor handle| E[BuildCoordinator Actor];
        E -->|Assigns task| F1[BuildWorker Actor 1];
        E -->|Assigns task| F2[BuildWorker Actor 2];
        E -->|Assigns task| F3[BuildWorker Actor N];
    end

    subgraph Execution on Worker Nodes
        F1 --> G1[Docker Container: ./gradlew build];
        F2 --> G2[Docker Container: xcodebuild archive];
        F3 --> G3[Docker Container: ...];
    end

    subgraph Logging & Artifacts
        G1 --> H[Log Aggregator];
        G2 --> H;
        G3 --> H;
        G1 --> I[Artifact Storage];
        G2 --> I;
        G3 --> I;
    end

Part 1: The Kafka Ingestion and Data Schema

Defining a clear, versionable contract for build requests was the first step. We used Protocol Buffers for this to ensure type safety and schema evolution.

// build_request.proto
syntax = "proto3";

package build.ci;

message BuildRequest {
  string build_id = 1;
  string repository_url = 2;
  string commit_sha = 3;
  string platform = 4; // "android", "ios"
  map<string, string> build_args = 5; // e.g., "buildType": "Release"
  int64 timestamp_ms = 6;
  string triggered_by = 7;
}

The producer is a simple Python service, perhaps a Flask or FastAPI application, that listens for webhooks from our Git provider. Its only job is to validate the incoming payload, construct a BuildRequest protobuf message, and publish it to Kafka.

# producer_service.py
import os
import uuid
import time
from confluent_kafka import Producer
from build_request_pb2 import BuildRequest

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
KAFKA_TOPIC = "build-requests"

class BuildRequestProducer:
    def __init__(self):
        conf = {'bootstrap.servers': KAFKA_BROKER}
        self._producer = Producer(**conf)
        print(f"Producer initialized for topic '{KAFKA_TOPIC}'")

    def delivery_report(self, err, msg):
        """ Called once for each message produced to indicate delivery result. """
        if err is not None:
            print(f"Message delivery failed: {err}")
        else:
            print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

    def publish_build_request(self, repo_url: str, commit_sha: str, platform: str):
        build_id = str(uuid.uuid4())
        request = BuildRequest(
            build_id=build_id,
            repository_url=repo_url,
            commit_sha=commit_sha,
            platform=platform,
            timestamp_ms=int(time.time() * 1000),
            triggered_by="webhook-service"
        )
        
        serialized_request = request.SerializeToString()
        
        # The build_id is used as the message key. This ensures that all
        # messages related to the same build (e.g., retries) land on the
        # same partition, preserving order if needed.
        self._producer.produce(
            KAFKA_TOPIC,
            key=build_id.encode('utf-8'),
            value=serialized_request,
            callback=self.delivery_report
        )
        
        # Wait for any outstanding messages to be delivered and delivery reports
        # to be received. In a real application, you might do this in batches.
        self._producer.flush()
        print(f"Published build request with ID: {build_id}")
        return build_id

# Example usage:
if __name__ == '__main__':
    producer = BuildRequestProducer()
    producer.publish_build_request(
        repo_url="[email protected]:my-org/android-app.git",
        commit_sha="a1b2c3d4",
        platform="android"
    )

Part 2: The Ray Execution Cluster

This is the core of the system. We define two types of Ray actors: a singleton BuildCoordinator to manage the queue and state, and multiple BuildWorker actors to perform the actual work.

The BuildCoordinator is responsible for fetching tasks from the BuildDispatcher (which is consuming from Kafka) and assigning them to idle workers. It maintains the state of all workers and the queue of pending builds. This centralizes the scheduling logic cleanly.

# coordinator_actor.py
import ray
import asyncio
from collections import deque

@ray.remote
class BuildCoordinator:
    def __init__(self, worker_pool_size=5):
        self.worker_pool_size = worker_pool_size
        self.workers = [BuildWorker.remote(i) for i in range(worker_pool_size)]
        self.idle_workers = deque(self.workers)
        self.pending_builds = deque()
        self.running_builds = {}  # build_id -> worker
        print(f"Coordinator started with {worker_pool_size} workers.")

    async def submit_build(self, build_request: dict):
        build_id = build_request['build_id']
        print(f"Coordinator received build request: {build_id}")

        if self.idle_workers:
            worker = self.idle_workers.popleft()
            self.running_builds[build_id] = worker
            
            # Don't wait for the build to finish. The `run_build` method
            # will call us back when it's done. This is non-blocking.
            worker.run_build.remote(self, build_request)
            print(f"Assigned build {build_id} to worker {worker}")
        else:
            self.pending_builds.append(build_request)
            print(f"No idle workers. Queued build {build_id}. Queue size: {len(self.pending_builds)}")
        
        return build_id

    def build_finished(self, worker, build_id: str, result: dict):
        """Callback method for workers to report completion."""
        print(f"Worker {worker} finished build {build_id} with status: {result['status']}")
        
        if build_id in self.running_builds:
            del self.running_builds[build_id]
        
        # Now, check if there are pending builds and assign one to the now-idle worker.
        if self.pending_builds:
            next_build = self.pending_builds.popleft()
            next_build_id = next_build['build_id']
            self.running_builds[next_build_id] = worker
            worker.run_build.remote(self, next_build)
            print(f"Assigned pending build {next_build_id} to worker {worker}")
        else:
            self.idle_workers.append(worker)
            print(f"Worker {worker} is now idle. Idle workers: {len(self.idle_workers)}")

    def get_status(self):
        return {
            "idle_workers": len(self.idle_workers),
            "running_builds": len(self.running_builds),
            "pending_builds": len(self.pending_builds),
        }

The BuildWorker actor is where the work happens. It encapsulates the logic for checking out code, spinning up a Docker container, and streaming the results back. The use of the docker-py library allows for programmatic control over the container lifecycle.

A critical decision was how to handle real-time logging. Our first attempt involved writing log lines back to another Kafka topic. This proved to be a mistake. The sheer volume of log data from hundreds of concurrent builds created immense pressure on our Kafka brokers and introduced noticeable latency in log visibility. The pitfall here is using a durable, ordered log for ephemeral, high-volume data.

We re-architected this part to use a ZeroMQ side-channel. Each BuildWorker acts as a ZeroMQ PUBlisher, broadcasting log lines on a topic specific to its build ID. A separate, non-Ray LogAggregator service acts as a SUBscriber, collecting these logs and forwarding them to Elasticsearch. ZeroMQ’s fire-and-forget nature is perfect for this use case—if a few log lines are dropped, it’s not a critical failure, but we gain near-zero-latency streaming.

# worker_actor.py
import ray
import docker
import os
import zmq
import time
import json
import logging
from coordinator_actor import BuildCoordinator # Assumes this is in a separate file

# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

ZMQ_LOG_PUBLISHER_URL = os.getenv("ZMQ_LOG_PUBLISHER_URL", "tcp://127.0.0.1:5555")

@ray.remote
class BuildWorker:
    def __init__(self, worker_id: int):
        self.worker_id = worker_id
        self.docker_client = None
        self.zmq_context = None
        self.log_socket = None

    def _setup(self):
        """Lazy initialization to avoid serialization issues with clients."""
        if self.docker_client is None:
            try:
                self.docker_client = docker.from_env()
                self.docker_client.ping()
                logging.info(f"Worker {self.worker_id}: Docker client initialized.")
            except Exception as e:
                logging.error(f"Worker {self.worker_id}: Failed to connect to Docker daemon: {e}")
                raise
        
        if self.zmq_context is None:
            self.zmq_context = zmq.Context()
            self.log_socket = self.zmq_context.socket(zmq.PUB)
            self.log_socket.connect(ZMQ_LOG_PUBLISHER_URL)
            logging.info(f"Worker {self.worker_id}: ZMQ log publisher connected to {ZMQ_LOG_PUBLISHER_URL}")

    def _publish_log(self, build_id: str, line: str):
        """Publish a log line over ZeroMQ."""
        try:
            # Topic is the build_id, message is the log line
            self.log_socket.send_string(f"{build_id}", flags=zmq.SNDMORE)
            self.log_socket.send_string(line)
        except Exception as e:
            # Non-critical, just log it.
            logging.warning(f"Worker {self.worker_id}: Failed to publish log for {build_id}: {e}")

    def run_build(self, coordinator: ray.actor.ActorHandle, build_request: dict):
        self._setup()
        build_id = build_request['build_id']
        repo_url = build_request['repository_url']
        commit_sha = build_request['commit_sha']

        start_time = time.time()
        self._publish_log(build_id, f"INFO: Starting build {build_id} on worker {self.worker_id}")

        # In a real project, this would be a persistent, shared volume.
        # For simplicity, we use a temporary directory on the host.
        checkout_dir = f"/tmp/builds/{build_id}"
        os.makedirs(checkout_dir, exist_ok=True)
        
        # Stage 1: Checkout code (simplified)
        # A robust implementation would use SSH keys and handle auth.
        self._publish_log(build_id, f"INFO: Cloning {repo_url} at commit {commit_sha}")
        # git.Repo.clone_from(...)
        
        # Stage 2: Run build in Docker
        # The image would be pre-built with all necessary SDKs.
        container_image = "android-builder:33.0" # Example for Android
        build_command = "./gradlew assembleRelease"
        
        container = None
        status = "FAILED"
        try:
            container = self.docker_client.containers.run(
                image=container_image,
                command=build_command,
                volumes={checkout_dir: {'bind': '/workspace', 'mode': 'rw'}},
                working_dir="/workspace",
                detach=True,
                remove=False # Keep container for inspection on failure
            )
            
            # Stream logs in real-time
            for line in container.logs(stream=True, follow=True):
                decoded_line = line.decode('utf-8').strip()
                self._publish_log(build_id, decoded_line)
            
            result = container.wait()
            exit_code = result['StatusCode']
            
            if exit_code == 0:
                status = "SUCCESS"
                self._publish_log(build_id, f"INFO: Build container exited with code 0.")
                # Stage 3: Upload artifacts from `checkout_dir` to S3/Artifactory
            else:
                self._publish_log(build_id, f"ERROR: Build container exited with non-zero code: {exit_code}")

        except docker.errors.ContainerError as e:
            self._publish_log(build_id, f"ERROR: Docker container execution failed: {e}")
        except docker.errors.ImageNotFound:
            self._publish_log(build_id, f"ERROR: Docker image not found: {container_image}")
        except Exception as e:
            self._publish_log(build_id, f"ERROR: An unexpected error occurred: {e}")
        finally:
            if container:
                if status == "SUCCESS":
                    container.remove() # Clean up successful builds
                else:
                    logging.warning(f"Worker {self.worker_id}: Kept failed container {container.id} for inspection.")
            
            end_time = time.time()
            duration = end_time - start_time
            self._publish_log(build_id, f"INFO: Build finished in {duration:.2f} seconds with status {status}")

            # Notify the coordinator that this worker is free.
            # This is a crucial step for the scheduling loop.
            coordinator.build_finished.remote(self, build_id, {"status": status, "duration": duration})

        return {"status": status}

The LogAggregator is a simple, standalone Python script running a ZeroMQ SUB socket in a tight loop. Its sole job is to receive and forward logs. It uses a FORWARDER device pattern, which is robust for this kind of fan-in scenario.

# log_aggregator_service.py
import zmq
import os

ZMQ_LOG_SUBSCRIBER_URL = os.getenv("ZMQ_LOG_SUBSCRIBER_URL", "tcp://*:5555")
# In a real system, this would publish to another backend like Logstash/Fluentd
ZMQ_FORWARDER_URL = os.getenv("ZMQ_FORWARDER_URL", "tcp://*:5556") 

def run_log_aggregator():
    """ A simple ZMQ forwarder device. """
    context = zmq.Context()
    
    # Socket to receive messages from workers
    frontend = context.socket(zmq.SUB)
    frontend.bind(ZMQ_LOG_SUBSCRIBER_URL)
    frontend.setsockopt_string(zmq.SUBSCRIBE, "") # Subscribe to all topics
    
    # In this example we just print, but you could bind another socket
    # to forward these logs to a more persistent backend.
    print(f"Log Aggregator listening on {ZMQ_LOG_SUBSCRIBER_URL}")
    
    while True:
        try:
            # We expect a multipart message [topic, message]
            topic = frontend.recv_string()
            message = frontend.recv_string()
            
            # Simple console output for demonstration
            # In production, this would be a client for Elasticsearch, Loki, etc.
            print(f"[{topic}] {message}")
            
        except zmq.ZMQError as e:
            print(f"ZMQ Error: {e}")
            break
        except KeyboardInterrupt:
            break
    
    print("Log Aggregator shutting down.")
    frontend.close()
    context.term()

if __name__ == "__main__":
    run_log_aggregator()

Part 3: The Dispatcher and Main Application Loop

The final piece is the BuildDispatcher, which bridges Kafka and Ray. It’s a long-running process that consumes from the build-requests topic and submits the jobs to the BuildCoordinator actor.

# main_dispatcher.py
import ray
import os
import time
import json
from confluent_kafka import Consumer, KafkaException
from build_request_pb2 import BuildRequest
from google.protobuf.json_format import MessageToDict

# Actors need to be imported so Ray knows about them.
from coordinator_actor import BuildCoordinator
from worker_actor import BuildWorker

KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
KAFKA_TOPIC = "build-requests"
RAY_ADDRESS = os.getenv("RAY_ADDRESS", "auto")
WORKER_POOL_SIZE = int(os.getenv("WORKER_POOL_SIZE", "10"))

def main():
    print("Initializing Ray...")
    ray.init(address=RAY_ADDRESS)
    print("Ray initialized.")

    # Create the coordinator actor. It's a named actor to ensure it's a singleton
    # within the Ray cluster that we can get a handle to.
    coordinator = BuildCoordinator.options(name="BuildCoordinator", get_if_exists=True).remote(
        worker_pool_size=WORKER_POOL_SIZE
    )
    print("BuildCoordinator actor handle obtained.")

    consumer_conf = {
        'bootstrap.servers': KAFKA_BROKER,
        'group.id': 'build-dispatcher-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False # We will commit offsets manually
    }
    consumer = Consumer(consumer_conf)
    consumer.subscribe([KAFKA_TOPIC])
    print(f"Kafka consumer subscribed to topic '{KAFKA_TOPIC}'")

    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    continue
                else:
                    print(msg.error())
                    break

            build_request_proto = BuildRequest()
            build_request_proto.ParseFromString(msg.value())
            
            # Convert protobuf to a dictionary for easier handling in Ray actors
            build_request_dict = MessageToDict(build_request_proto)
            
            print(f"Consumed build request: {build_request_dict['buildId']}")
            
            # This call is non-blocking. It returns a Ray ObjectRef immediately.
            # The coordinator actor will process this asynchronously.
            ray.get(coordinator.submit_build.remote(build_request_dict))

            # Manually commit offset after successful submission to Ray.
            # This gives us at-least-once processing guarantees.
            consumer.commit(asynchronous=False)

    except KeyboardInterrupt:
        print("Dispatcher shutting down.")
    finally:
        consumer.close()
        ray.shutdown()

if __name__ == '__main__':
    main()

This setup resulted in a system that could easily scale by adding more nodes to the Ray cluster. No single point of failure in the execution layer could bring the system down; if a worker node died, Ray’s fault tolerance mechanisms would allow the coordinator to reschedule the build on a different node. Build queues were effectively eliminated, replaced by a dynamic pool of workers that provided immediate feedback to developers.

The architecture is not without its limitations. The current implementation of the BuildCoordinator is a single actor, which could become a bottleneck under extreme load (thousands of requests per second), though for a mobile CI use case, this is unlikely. A more advanced design might involve sharding the coordinator logic across multiple actors. Furthermore, managing distributed build caches (like Gradle or Cocoapods caches) across the worker pool remains a challenge. A shared, high-performance network file system (like NFS or CephFS) or a dedicated caching service would be the next logical iteration to further reduce build times. Finally, the reliability of the ZeroMQ log channel is best-effort; for critical status updates or final build results, we still rely on a durable message back to Kafka to ensure no information is lost. The boundary of what is acceptable to send over this side-channel versus the main event bus is a crucial trade-off.


  TOC