Automating a Resilient High-Throughput Ingestion Pipeline with Chef for an iOS Event Stream into ScyllaDB


The initial system was deceptively simple and catastrophically naive. An iOS client fleet, numbering in the millions, was configured to send analytics events directly to a series of endpoints. These endpoints were handled by a robust Nginx tier, which load-balanced requests to a Python application that performed a direct, synchronous write to a ScyllaDB cluster. On paper, it worked. In staging, it worked. The moment we flipped the switch for 10% of our production traffic, the entire architecture began to buckle.

Client-side p99 latency shot past our 200ms SLO, reaching into multiple seconds. ScyllaDB’s compaction backlog grew uncontrollably. The root cause was a classic impedance mismatch: a firehose of millions of tiny, independent writes from the outside world colliding with a database optimized for handling large, batched throughput. Each tiny event created its own SSTable, leading to massive read amplification during compaction and overwhelming the nodes. Our database was dying a death of a thousand cuts.

graph TD
    subgraph Initial Failed Architecture
        A[iOS Clients] -- Millions of tiny HTTP POSTs --> B(Nginx Load Balancer);
        B --> C{API Fleet};
        C -- One write per request --> D[(ScyllaDB Cluster)];
        D -- Overwhelmed by tiny writes & compactions --> E((🔥 System Collapse 🔥));
    end

We needed a shock absorber. A component that could soak up the chaotic, bursty influx of client requests and transform it into a smooth, predictable stream of large writes that ScyllaDB could digest efficiently. This wasn’t just a matter of tuning; it required a fundamental architectural rethink. Our new concept centered on introducing an in-memory buffer, and Redis Streams was the perfect candidate. It offered persistence, consumer groups for scalable processing, and blistering speed.

This completely changes the way we work. Instead of a tight coupling between client request and database write, we would create a decoupled, asynchronous pipeline. The frontend API’s only job would be to validate an event and toss it into a Redis Stream as fast as possible, returning a 202 Accepted to the iOS client immediately. A separate, backend consumer fleet would then read from this stream in large batches, preparing and executing efficient writes to ScyllaDB.

The coolest part is that this entire, complex topology—from Nginx configuration to the deployment of our Python services and the meticulous tuning of the ScyllaDB cluster—needed to be automated, repeatable, and utterly foolproof. This was a non-negotiable requirement for our SRE team, and Chef was our tool of choice for codifying this infrastructure.

The Chef Cookbook: Orchestrating the Chaos

We structured our deployment within a single Chef wrapper cookbook, event_pipeline, which declared dependencies on well-known community cookbooks and our own internal ones.

The metadata.rb defines the structural dependencies:

# cookbooks/event_pipeline/metadata.rb
name 'event_pipeline'
maintainer 'Your Company'
maintainer_email '[email protected]'
license 'All Rights Reserved'
description 'Installs/Configures the event ingestion pipeline'
version '0.1.0'
chef_version '>= 16.0'

# Community cookbooks from the Chef Supermarket
depends 'nginx', '~> 11.1.0'
depends 'redisio', '~> 5.1.0'
depends 'poise-python', '~> 1.7.0'

# Internal cookbook for managing ScyllaDB
depends 'company_scylla', '~> 1.2.0'

Our core configuration lived in the attributes file, allowing us to manage versions, port numbers, and service flags from a single source of truth. This is critical for managing different environments (staging vs. production) without code changes.

# cookbooks/event_pipeline/attributes/default.rb
# Ingestion API settings
default['event_pipeline']['ingestion_api']['port'] = 8080
default['event_pipeline']['ingestion_api']['workers'] = node['cpu']['total'].to_i * 2
default['event_pipeline']['ingestion_api']['source_dir'] = '/opt/ingestion_api'
default['event_pipeline']['ingestion_api']['venv_dir'] = '/opt/ingestion_api/venv'

# Stream Processor settings
default['event_pipeline']['stream_processor']['source_dir'] = '/opt/stream_processor'
default['event_pipeline']['stream_processor']['venv_dir'] = '/opt/stream_processor/venv'
default['event_pipeline']['stream_processor']['batch_size'] = 500
default['event_pipeline']['stream_processor']['read_timeout_ms'] = 2000 # 2 seconds

# Redis Stream configuration
default['event_pipeline']['redis']['stream_name'] = 'ios:events:raw'
default['event_pipeline']['redis']['consumer_group'] = 'scylla-processors'
default['event_pipeline']['redis']['consumer_name'] = node['hostname']

# Nginx configuration
default['nginx']['default_site_enabled'] = false
default['nginx']['worker_processes'] = 'auto'
default['nginx']['worker_connections'] = 2048
default['nginx']['keepalive_timeout'] = 65

# ScyllaDB configuration
default['event_pipeline']['scylla']['contact_points'] = ['10.0.1.10', '10.0.1.11', '10.0.1.12']
default['event_pipeline']['scylla']['keyspace'] = 'events_v1'
default['event_pipeline']['scylla']['table'] = 'user_actions'

Tier 1: The Nginx Ingress and Ingestion API

The first point of contact is Nginx. Its configuration, managed by a Chef template, is tuned for handling a massive number of concurrent, short-lived connections from our iOS clients.

Here’s the Chef recipe segment that lays down the Nginx configuration:

# cookbooks/event_pipeline/recipes/ingestion.rb

# ... (user creation, directory setup) ...

# Deploy the Nginx site configuration from a template
template '/etc/nginx/sites-available/ingestion_api' do
  source 'nginx/ingestion_api.conf.erb'
  owner 'root'
  group 'root'
  mode '0644'
  variables(
    api_port: node['event_pipeline']['ingestion_api']['port']
  )
  notifies :reload, 'service[nginx]', :delayed
end

nginx_site 'ingestion_api' do
  action :enable
end

The corresponding Nginx template (ingestion_api.conf.erb) is where the performance tuning happens. We’re setting up a simple upstream and proxying to it. The key is the aggressive keepalive settings and large connection pool to reduce TCP/TLS handshake overhead.

# templates/default/nginx/ingestion_api.conf.erb

upstream ingestion_api_backend {
    server 127.0.0.1:<%= @api_port %>;
    # Enable keepalive connections to the upstream application
    keepalive 128;
}

server {
    listen 80;
    server_name events.example.com;

    # Buffer settings to handle requests efficiently in memory
    client_body_buffer_size 128k;
    client_max_body_size 1m;

    access_log /var/log/nginx/ingestion.access.log main;
    error_log /var/log/nginx/ingestion.error.log warn;

    location /v1/event {
        if ($request_method != 'POST') {
            return 405;
        }

        proxy_pass http://ingestion_api_backend;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # Timeouts for upstream communication
        proxy_connect_timeout 50ms;
        proxy_send_timeout 100ms;
        proxy_read_timeout 100ms;
    }
}

The upstream ingestion_api_backend is a small Python application using FastAPI for its raw speed. Its only job is to receive a JSON payload and XADD it to our Redis Stream. It does almost no processing.

Here’s the core of the ingestion API application, which Chef will deploy.

# /opt/ingestion_api/main.py
import asyncio
import logging
import time
import uvicorn
import redis.asyncio as redis
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse

# --- Configuration ---
# In a real-world project, these would come from env vars or a config service.
# Chef would template a file or set environment variables in the systemd unit.
REDIS_HOST = "10.0.2.5"
REDIS_PORT = 6379
STREAM_NAME = "ios:events:raw"
STREAM_MAX_LEN = 10_000_000 # Cap the stream size to avoid unbounded memory usage

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Application Setup ---
app = FastAPI()
redis_pool = None

@app.on_event("startup")
async def startup_event():
    global redis_pool
    try:
        logger.info(f"Connecting to Redis at {REDIS_HOST}:{REDIS_PORT}")
        redis_pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=0, max_connections=50)
    except Exception as e:
        logger.critical(f"Failed to create Redis connection pool: {e}", exc_info=True)
        # We might choose to exit here if Redis is essential for startup.
        raise

@app.on_event("shutdown")
async def shutdown_event():
    if redis_pool:
        await redis_pool.disconnect()
        logger.info("Redis connection pool closed.")

@app.post("/v1/event")
async def post_event(request: Request):
    """
    Receives an event from an iOS client and adds it to the Redis Stream.
    Returns 202 Accepted on success.
    """
    if not redis_pool:
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"detail": "Redis connection not available."},
        )

    client = redis.Redis(connection_pool=redis_pool)
    try:
        event_body = await request.json()
        
        # In a real app, you'd have robust validation here (e.g., using Pydantic)
        if "user_id" not in event_body or "event_type" not in event_body:
             return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"detail": "Missing required fields 'user_id' or 'event_type'."},
            )
        
        # The data is a dictionary of key-value pairs to be stored in the stream.
        # Redis streams handle string keys and values. We'll flatten the JSON.
        # A common practice is to just store the entire JSON as a single field.
        stream_data = {
            'payload': await request.body() # Store the raw bytes for efficiency
        }

        # XADD command adds the event to the stream.
        # The 'maxlen' argument with '~' makes it an approximate trim, which is faster.
        await client.xadd(STREAM_NAME, stream_data, maxlen=STREAM_MAX_LEN, approximate=True)
        
        return JSONResponse(status_code=status.HTTP_202_ACCEPTED, content={"status": "queued"})

    except redis.exceptions.RedisError as e:
        logger.error(f"Redis error during XADD: {e}", exc_info=True)
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"detail": "Failed to queue event."},
        )
    except Exception as e:
        logger.error(f"Unexpected error processing event: {e}", exc_info=True)
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"detail": "An internal error occurred."},
        )

# A simple health check endpoint
@app.get("/health")
def health_check():
    return {"status": "ok"}

Tier 2: The Redis Stream Processor

This is the heart of the new architecture. It’s a long-running background service, also deployed and managed by Chef as a systemd unit. Its sole purpose is to read from the Redis Stream in batches and execute prepared, batched statements against ScyllaDB.

graph TD
    subgraph New Resilient Architecture
        A[iOS Clients] -- Millions of tiny HTTP POSTs --> B(Nginx Load Balancer);
        B --> C{FastAPI Ingestion API};
        C -- Fast `XADD` --> D[Redis Stream];
        D -- Consumer Group `XREADGROUP` --> E{Python Stream Processors};
        E -- Large, efficient batches --> F[(ScyllaDB Cluster)];
        F -- Smooth, predictable load --> G((✅ System Stable ✅));
    end

The processor code needs to be resilient. It must handle database connection failures, malformed data from the stream, and gracefully manage its position in the stream using consumer group acknowledgements (XACK).

# /opt/stream_processor/processor.py
import asyncio
import logging
import json
import os
import signal
import sys
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy, ConstantSpeculativeExecutionPolicy
from cassandra.auth import PlainTextAuthProvider
import redis.asyncio as redis

# --- Configuration ---
REDIS_HOST = os.getenv("REDIS_HOST", "10.0.2.5")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
STREAM_NAME = os.getenv("STREAM_NAME", "ios:events:raw")
CONSUMER_GROUP = os.getenv("CONSUMER_GROUP", "scylla-processors")
CONSUMER_NAME = os.getenv("CONSUMER_NAME", "processor-1")
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 500))
READ_TIMEOUT_MS = int(os.getenv("READ_TIMEOUT_MS", 2000))

SCYLLA_HOSTS = os.getenv("SCYLLA_HOSTS", "10.0.1.10,10.0.1.11").split(',')
SCYLLA_KEYSPACE = os.getenv("SCYLLA_KEYSPACE", "events_v1")
SCYLLA_TABLE = os.getenv("SCYLLA_TABLE", "user_actions")

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(CONSUMER_NAME)

# --- Global State ---
shutdown_signal = asyncio.Event()

class ScyllaConnector:
    """Manages connection and queries to ScyllaDB."""
    def __init__(self, hosts, keyspace):
        self.hosts = hosts
        self.keyspace = keyspace
        self.session = None
        self.cluster = None
        # This prepared statement is a massive performance boost.
        # The driver only sends the query string once, then only sends bind variables.
        self.prepared_insert = None

    def connect(self):
        try:
            # Using TokenAwarePolicy is critical for performance with Scylla/Cassandra.
            # It routes queries directly to the node that owns the data.
            load_balancing_policy = TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='dc1'))
            
            self.cluster = Cluster(
                self.hosts,
                load_balancing_policy=load_balancing_policy,
                # Speculative execution can help with p99 latency by re-trying on a new host if the first is slow.
                speculative_execution_policy=ConstantSpeculativeExecutionPolicy(0.05, 2), # 50ms delay, 2 max attempts
                protocol_version=4
            )
            self.session = self.cluster.connect(self.keyspace)
            self.prepare_statements()
            logger.info(f"Successfully connected to ScyllaDB cluster at {self.hosts}")
        except Exception as e:
            logger.critical(f"Failed to connect to ScyllaDB: {e}", exc_info=True)
            raise

    def prepare_statements(self):
        query = f"INSERT INTO {SCYLLA_TABLE} (user_id, event_time, event_type, payload) VALUES (?, ?, ?, ?)"
        self.prepared_insert = self.session.prepare(query)

    def close(self):
        if self.cluster:
            self.cluster.shutdown()
            logger.info("ScyllaDB connection closed.")

    async def write_batch(self, events):
        if not self.session or not self.prepared_insert:
            raise ConnectionError("ScyllaDB session not initialized.")

        batch = BatchStatement()
        processed_count = 0
        for event in events:
            try:
                # Assuming 'payload' is the key holding the original JSON
                data = json.loads(event['payload'])
                # Add robust validation here
                batch.add(self.prepared_insert, (
                    data['user_id'],
                    data['event_time'],
                    data['event_type'],
                    json.dumps(data.get('payload', {}))
                ))
                processed_count += 1
            except (json.JSONDecodeError, KeyError) as e:
                logger.warning(f"Skipping malformed event. ID: {event.get('id')}. Error: {e}")
                continue # Skip malformed records

        if processed_count > 0:
            try:
                await asyncio.to_thread(self.session.execute, batch)
                return processed_count
            except Exception as e:
                logger.error(f"Failed to execute batch write to ScyllaDB: {e}", exc_info=True)
                # In a real system, you would have a dead-letter queue strategy here.
                return 0
        return 0

async def process_stream(scylla_conn):
    """Main loop to read from Redis and write to ScyllaDB."""
    try:
        redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
        # Ensure the consumer group exists. The 'mkstream=True' is vital for the first run.
        await redis_client.xgroup_create(STREAM_NAME, CONSUMER_GROUP, id='0', mkstream=True)
        logger.info(f"Consumer group '{CONSUMER_GROUP}' ensured for stream '{STREAM_NAME}'")
    except redis.exceptions.ResponseError as e:
        if "name already exists" in str(e):
            logger.info(f"Consumer group '{CONSUMER_GROUP}' already exists.")
        else:
            logger.critical(f"Failed to create Redis consumer group: {e}", exc_info=True)
            return

    while not shutdown_signal.is_set():
        try:
            # XREADGROUP reads messages for our consumer that have not been acknowledged.
            # '>' means "give me new messages that have never been delivered to any consumer".
            response = await redis_client.xreadgroup(
                CONSUMER_GROUP,
                CONSUMER_NAME,
                {STREAM_NAME: '>'},
                count=BATCH_SIZE,
                block=READ_TIMEOUT_MS
            )

            if not response:
                continue # Timeout, just loop again

            stream, messages = response[0]
            if not messages:
                continue

            events_to_process = [{'id': msg_id, **msg_data} for msg_id, msg_data in messages]
            logger.info(f"Read {len(events_to_process)} events from stream.")

            written_count = await scylla_conn.write_batch(events_to_process)
            
            if written_count > 0:
                # We only acknowledge messages that were successfully processed.
                # If write_batch fails, messages remain pending and can be re-processed.
                last_successful_id = events_to_process[written_count - 1]['id']
                await redis_client.xack(STREAM_NAME, CONSUMER_GROUP, last_successful_id)
                logger.info(f"Successfully wrote and ACKed {written_count} events up to ID {last_successful_id}.")
            
        except redis.exceptions.RedisError as e:
            logger.error(f"Redis error during processing loop: {e}. Retrying in 5s...", exc_info=True)
            await asyncio.sleep(5)
        except Exception as e:
            logger.critical(f"Unhandled exception in processing loop: {e}. Shutting down...", exc_info=True)
            shutdown_signal.set()

    await redis_client.close()

def handle_shutdown(sig, frame):
    logger.warning(f"Received signal {sig}. Initiating graceful shutdown...")
    shutdown_signal.set()

async def main():
    signal.signal(signal.SIGINT, handle_shutdown)
    signal.signal(signal.SIGTERM, handle_shutdown)

    scylla_connector = None
    try:
        scylla_connector = ScyllaConnector(SCYLLA_HOSTS, SCYLLA_KEYSPACE)
        scylla_connector.connect()
        await process_stream(scylla_connector)
    finally:
        if scylla_connector:
            scylla_connector.close()
        logger.info("Processor has shut down.")

if __name__ == "__main__":
    asyncio.run(main())

The Final Piece: ScyllaDB Schema and Deployment

Our ScyllaDB table schema is designed for our primary write pattern. user_id is the partition key, which spreads the load evenly across the cluster as long as user activity is random. event_time is the clustering key, which stores events for a single user chronologically on disk.

-- This would be managed by a Chef recipe using a CQL script executor
CREATE KEYSPACE IF NOT EXISTS events_v1
WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 3};

USE events_v1;

CREATE TABLE IF NOT EXISTS user_actions (
    user_id text,
    event_time timestamp,
    event_type text,
    payload text,
    PRIMARY KEY (user_id, event_time)
) WITH CLUSTERING ORDER BY (event_time DESC)
AND default_time_to_live = 2592000; -- Expire data after 30 days

The transformation was night and day. Client-side API latency dropped to a stable sub-20ms average. The ScyllaDB cluster’s load became smooth and predictable, with write throughput hitting hundreds of thousands of operations per second during peaks, but in the form of a few thousand large, efficient batch operations. Compaction storms vanished. The entire system breathed.

The final architecture, while more complex, is infinitely more resilient. A slowdown in the database no longer impacts the client-facing API. The Redis Stream acts as a massive buffer, absorbing hours of data if the downstream processors are slow or being restarted for a deployment. The use of Chef ensures that any node—be it an Nginx server, an API worker, or a stream processor—can be rebuilt from scratch to the exact required specification within minutes, a crucial capability for disaster recovery and scaling.

This architecture is not without its own set of challenges. We now have to monitor the depth of the Redis Stream to alert on potential backpressure issues where the processors can’t keep up with the ingestion rate. Furthermore, while the current stream processor is robust, scaling it horizontally requires careful management of the consumer group to rebalance partitions when a new consumer joins or an old one leaves. Our next iteration will focus on building more sophisticated autoscaling for the processor fleet based on stream length and processing latency metrics.


  TOC