The project mandate was a high-throughput, two-stage event processing pipeline. The initial stage involved stateless, high-concurrency ingestion and validation, a task well-suited for the JVM’s threading model. The second stage required complex data enrichment using Python-native machine learning libraries. Our existing ScyllaDB cluster, prized for its raw write performance, was the designated data store. The conventional wisdom screamed for a message broker—Kafka or Pulsar—to decouple the ingestion and enrichment services. However, due to strict operational constraints on introducing new infrastructure components and a desire to minimize latency overhead, we were forced to challenge this convention. The core problem crystallized: how to orchestrate a reliable, ordered, and resilient workflow between a Kotlin/Ktor service and a Python/FastAPI service using only the database as the coordination primitive. This path meant turning ScyllaDB from a simple key-value store into a distributed state machine, a decision fraught with potential performance pitfalls.
Our initial concept hinged on treating a ScyllaDB table as a queue. The Ktor service would perform an unconditional INSERT
, and the FastAPI workers would poll this table for new work. The immediate, glaring flaw in this design is the race condition: multiple Python workers could fetch and process the same event simultaneously, leading to data corruption and wasted compute. The solution required an atomic state transition—a mechanism to claim an event. ScyllaDB, being Cassandra-compatible, offers Lightweight Transactions (LWTs) which provide conditional mutations using a PAXOS
-based consensus protocol. An UPDATE
or INSERT
can be predicated on an IF
clause, guaranteeing an atomic compare-and-set operation. This became the cornerstone of our design.
The data model was the first critical piece. A naive single-table approach would not scale due to ScyllaDB’s architecture, which abhors queries that cannot be satisfied by a single partition key. Polling for “unprocessed” work would necessitate a table scan or a secondary index, both of which are performance anti-patterns for this workload. We landed on a sharded queue model, implemented through careful schema design.
// ScyllaDB/Cassandra Query Language (CQL) Schema
// This is the core table acting as our distributed queue.
CREATE KEYSPACE IF NOT EXISTS ingestion_pipeline
WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3 };
USE ingestion_pipeline;
CREATE TABLE IF NOT EXISTS event_queue (
// The partition key is a composite of a shard_id and a time_bucket.
// shard_id allows workers to subscribe to specific streams of work.
// time_bucket (e.g., hour) prevents partitions from growing unboundedly (a classic anti-pattern).
shard_id int,
time_bucket timestamp,
// Clustering columns ensure events are sorted within a partition.
event_timestamp timestamp,
event_id uuid,
// State management fields for the workflow.
status text, // e.g., 'RECEIVED', 'PROCESSING', 'COMPLETED', 'FAILED'
processor_id uuid, // ID of the worker that has claimed the event
lease_until timestamp, // A timeout to handle crashed workers
// The actual data.
payload blob,
metadata map<text, text>,
PRIMARY KEY ((shard_id, time_bucket), event_timestamp, event_id)
) WITH CLUSTERING ORDER BY (event_timestamp ASC);
The partition key (shard_id, time_bucket)
is crucial. It distributes the write load across the cluster and allows our FastAPI consumers to poll specific, manageable partitions instead of scanning the entire keyspace. The Ktor ingestion service would be responsible for assigning a shard_id
(e.g., based on a hash of some attribute in the incoming data) and calculating the current time_bucket
.
The first component was the Ktor ingestion service. Its sole job: receive an event via HTTP, validate it, and write it to the event_queue
table with a status of 'RECEIVED'
. Performance and resilience were key. We used the official DataStax Java Driver, configured for high throughput.
Here’s the core configuration and service implementation in Kotlin.
build.gradle.kts
(Ktor Project Dependencies)
plugins {
kotlin("jvm") version "1.9.20"
id("io.ktor.plugin") version "2.3.6"
kotlin("plugin.serialization") version "1.9.20"
}
// ... other configurations
dependencies {
implementation("io.ktor:ktor-server-core-jvm")
implementation("io.ktor:ktor-server-cio-jvm")
implementation("io.ktor:ktor-server-content-negotiation-jvm")
implementation("io.ktor:ktor-serialization-kotlinx-json-jvm")
implementation("com.datastax.oss:java-driver-core:4.17.0")
implementation("ch.qos.logback:logback-classic:1.4.11")
// ... test dependencies
}
ScyllaDB Driver Configuration (CqlSessionManager.kt
)
package com.pipeline.ingestion.db
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import java.net.InetSocketAddress
import java.time.Duration
object CqlSessionManager {
// In a real project, these would come from a config file (e.g., HOCON).
private val contactPoints = listOf(InetSocketAddress("scylla-node1", 9042))
private val localDatacenter = "dc1"
private val keyspace = "ingestion_pipeline"
val session: CqlSession by lazy {
val configLoader = DriverConfigLoader.programmaticBuilder()
.with(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5))
.with(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 4)
.with(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 2)
.with(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_QUORUM")
.build()
CqlSession.builder()
.addContactPoints(contactPoints)
.withLocalDatacenter(localDatacenter)
.withKeyspace(keyspace)
.withConfigLoader(configLoader)
.build()
}
}
The configuration focuses on production readiness. A LOCAL_QUORUM
consistency ensures strong consistency within the local datacenter, a sensible default for writes. Connection pool sizing is critical for performance tuning.
Ktor Ingestion Logic (EventIngestionService.kt
)
package com.pipeline.ingestion.service
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.PreparedStatement
import kotlinx.coroutines.future.await
import java.nio.ByteBuffer
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID
class EventIngestionService(private val session: CqlSession) {
private val insertStatement: PreparedStatement = session.prepare(
"""
INSERT INTO event_queue (shard_id, time_bucket, event_timestamp, event_id, status, payload, metadata)
VALUES (?, ?, ?, ?, 'RECEIVED', ?, ?)
"""
)
// This defines the number of virtual shards for event distribution.
private val totalShards = 16
suspend fun processEvent(payload: ByteArray, metadata: Map<String, String>) {
val eventId = UUID.randomUUID()
val now = Instant.now()
// A simple sharding strategy. A real system might use a more stable hash.
val shardId = eventId.hashCode() % totalShards
// Bucket time to the nearest hour to prevent partition explosion.
val timeBucket = now.truncatedTo(ChronoUnit.HOURS)
val boundStatement = insertStatement.bind(
shardId,
timeBucket,
now,
eventId,
ByteBuffer.wrap(payload),
metadata
)
try {
// The driver's async methods return CompletionStage, which integrates perfectly with Kotlin coroutines.
session.executeAsync(boundStatement).await()
} catch (e: Exception) {
// Add proper logging and error handling.
// Maybe retry logic or dead-letter queueing.
println("Failed to write event ${eventId}: ${e.message}")
throw e // Rethrow to let the caller handle the HTTP response.
}
}
}
This Ktor service is lean. It receives a request, determines the shard and time bucket, and executes an asynchronous INSERT
. There’s no LWT here; writes should be fast and unconditional. The complexity moves to the consumer side.
Now for the Python FastAPI worker service. This service does not expose HTTP endpoints for processing. Instead, it runs background tasks that poll ScyllaDB, claim events using LWTs, and execute the enrichment logic.
requirements.txt
(FastAPI Project Dependencies)
fastapi
uvicorn
cassandra-driver
Python ScyllaDB Driver Configuration (db_connector.py
)
import asyncio
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BatchStatement, ConsistencyLevel
# In production, use a config management system.
SCYLLA_HOSTS = ['scylla-node1']
SCYLLA_KEYSPACE = 'ingestion_pipeline'
class ScyllaConnector:
_session = None
@classmethod
def get_session(cls):
if cls._session is None:
# The Python driver setup mirrors the Java one for consistency.
cluster = Cluster(
SCYLLA_HOSTS,
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='dc1')),
protocol_version=4
)
cls._session = cluster.connect(SCYLLA_KEYSPACE)
return cls._session
@classmethod
def close(cls):
if cls._session:
cls._session.cluster.shutdown()
FastAPI Worker Logic (worker.py
)
import asyncio
import uuid
import time
from datetime import datetime, timedelta, timezone
from db_connector import ScyllaConnector
class EnrichmentWorker:
def __init__(self, shard_id: int, worker_id: uuid.UUID):
self.shard_id = shard_id
self.worker_id = worker_id
self.session = ScyllaConnector.get_session()
self.running = False
# Prepare statements for efficiency. This is a critical optimization.
self.find_events_stmt = self.session.prepare(
"SELECT event_timestamp, event_id FROM event_queue WHERE shard_id = ? AND time_bucket = ?"
)
self.claim_event_stmt = self.session.prepare(
"""
UPDATE event_queue
SET status = 'PROCESSING', processor_id = ?, lease_until = ?
WHERE shard_id = ? AND time_bucket = ? AND event_timestamp = ? AND event_id = ?
IF status = 'RECEIVED'
"""
)
self.complete_event_stmt = self.session.prepare(
"""
UPDATE event_queue SET status = 'COMPLETED'
WHERE shard_id = ? AND time_bucket = ? AND event_timestamp = ? AND event_id = ?
"""
)
# Statement to fetch payload for a claimed event.
self.get_payload_stmt = self.session.prepare(
"SELECT payload FROM event_queue WHERE shard_id = ? AND time_bucket = ? AND event_timestamp = ? AND event_id = ?"
)
async def poll_and_process(self):
"""The main loop for a worker."""
self.running = True
print(f"Worker {self.worker_id} starting for shard {self.shard_id}")
while self.running:
try:
# We check the current and previous time buckets to handle events near the hour boundary.
now = datetime.now(timezone.utc)
current_bucket = now.replace(minute=0, second=0, microsecond=0)
prev_bucket = current_bucket - timedelta(hours=1)
for bucket in [prev_bucket, current_bucket]:
await self._process_bucket(bucket)
except Exception as e:
print(f"Error in worker {self.worker_id}: {e}")
# The poll interval is a tunable parameter.
await asyncio.sleep(5)
async def _process_bucket(self, time_bucket: datetime):
# Fetch a batch of potential events.
rows = self.session.execute(self.find_events_stmt, [self.shard_id, time_bucket])
for event_header in rows:
# Attempt to claim the event using an LWT. This is the critical atomic step.
lease_duration_seconds = 300 # 5 minutes
lease_until = datetime.now(timezone.utc) + timedelta(seconds=lease_duration_seconds)
result = self.session.execute(self.claim_event_stmt, [
self.worker_id,
lease_until,
self.shard_id,
time_bucket,
event_header.event_timestamp,
event_header.event_id
])
# The result of an LWT is a row with an `[applied]` column.
if result.one()[0] is True:
print(f"Worker {self.worker_id} claimed event {event_header.event_id}")
try:
await self._execute_enrichment(time_bucket, event_header)
except Exception as e:
print(f"Failed to process event {event_header.event_id}: {e}")
# Here you'd implement retry logic or mark the event as FAILED.
# For now, the lease will simply expire and another worker will pick it up.
break # Move to the next polling cycle to get fresh data.
async def _execute_enrichment(self, time_bucket: datetime, event_header):
# Fetch the full payload since we have the lock.
payload_row = self.session.execute(self.get_payload_stmt, [
self.shard_id, time_bucket, event_header.event_timestamp, event_header.event_id
]).one()
if not payload_row:
print(f"Could not find payload for claimed event {event_header.event_id}. Something is wrong.")
return
# Simulate the actual work. This is where the ML models would be called.
print(f"Worker {self.worker_id} processing payload of size {len(payload_row.payload)}...")
await asyncio.sleep(2) # Simulate I/O bound work
# Mark the event as completed.
self.session.execute(self.complete_event_stmt, [
self.shard_id, time_bucket, event_header.event_timestamp, event_header.event_id
])
print(f"Worker {self.worker_id} completed event {event_header.event_id}")
async def main():
# In a real system, you'd have a supervisor managing multiple worker tasks.
# The number of workers and their shard assignments could be configured dynamically.
worker = EnrichmentWorker(shard_id=5, worker_id=uuid.uuid4())
await worker.poll_and_process()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Shutting down workers.")
finally:
ScyllaConnector.close()
The Python worker is substantially more complex. It’s stateful, maintaining a connection and prepared statements. The core loop polls for work within specific partitions (shard_id
, time_bucket
), then iterates through potential events attempting to claim one via the claim_event_stmt
LWT. Only if the IF status = 'RECEIVED'
condition is met will the database state be changed and the worker granted the right to process the event. The lease_until
field is our safety net; if a worker crashes mid-process, the lease will expire, and another worker can eventually reclaim the stale event (this recovery logic is not fully implemented in the snippet but would check lease_until < now()
).
This architecture can be visualized as follows:
sequenceDiagram participant Client participant KtorIngestion as Ktor Ingestion Service participant ScyllaDB participant FastAPIWorker as FastAPI Worker (Shard 5) participant FastAPIWorker2 as FastAPI Worker (Shard 6) Client->>+KtorIngestion: POST /event (payload) KtorIngestion->>KtorIngestion: Hash payload -> shard_id=5 KtorIngestion->>+ScyllaDB: INSERT INTO event_queue (shard_id=5, status='RECEIVED', ...) ScyllaDB-->>-KtorIngestion: Write OK KtorIngestion-->>-Client: 202 Accepted loop Poll for work FastAPIWorker->>+ScyllaDB: SELECT ... FROM event_queue WHERE shard_id=5 ScyllaDB-->>-FastAPIWorker: [event_id_123] FastAPIWorker->>+ScyllaDB: UPDATE ... SET status='PROCESSING' ... WHERE event_id=123 IF status='RECEIVED' ScyllaDB-->>-FastAPIWorker: [applied]=true Note over FastAPIWorker: Lock acquired! FastAPIWorker->>+ScyllaDB: SELECT payload WHERE event_id=123 ScyllaDB-->>-FastAPIWorker: (payload data) FastAPIWorker->>FastAPIWorker: Run enrichment logic() FastAPIWorker->>+ScyllaDB: UPDATE ... SET status='COMPLETED' WHERE event_id=123 ScyllaDB-->>-FastAPIWorker: Write OK end Note over FastAPIWorker2: Worker for shard 6 is polling independently and does not see event_id_123.
The system works, but it’s not without its sharp edges. LWTs are significantly more expensive than standard writes, as they require multiple round-trips between coordinator and replica nodes to establish consensus. Our design mitigates this by using LWTs only for the infrequent “claim” operation, not the high-volume initial ingestion. The polling mechanism is another potential issue; it introduces latency and can lead to wasted CPU cycles if the queue is often empty. A busy-wait poll with a short sleep is a trade-off between responsiveness and resource usage. Furthermore, the logic to handle zombie workers and expired leases adds application-level complexity that a dedicated message broker would typically manage internally.
This brokerless pattern represents a specific architectural trade-off. It leverages an existing, powerful data store to perform a secondary duty of state management, avoiding the operational cost of another distributed system. The price paid is in application complexity and the careful performance tuning required to prevent the coordination logic from overwhelming the database. The choice to use LWTs for locking, combined with a sharded-queue table design, proves to be a viable, if demanding, solution for integrating heterogeneous services in a high-throughput environment when conventional tools are off the table. The next iteration would likely explore ScyllaDB’s Change Data Capture (CDC) feature, which could transform the polling consumer into a more efficient event-streaming consumer, eliminating polling latency and further closing the gap with traditional message brokers.