Managing ScyllaDB schema changes across multiple environments was a recurring source of production incidents. Manual cqlsh
sessions, conflicting developer changes applied out of order, and divergence between staging and production schemas led to unpredictable application behavior. The core pain point was the lack of a reliable, auditable, and automated process for evolving the database schema in lockstep with our application code. A developer merging a feature that relied on a new table column could bring down services if the database migration hadn’t been applied correctly, or at all. The process was fundamentally broken, relying on institutional knowledge and checklists, which are notoriously fragile under pressure.
Our initial concept was to treat database schema as code, subjecting it to the same rigor as our application source: version control, peer review, and automated deployment. The goal was a hands-off system where a git merge
to the main branch would be the sole trigger for a production schema change. This naturally led us to a GitOps model. The source of truth for our infrastructure, application deployments, and now, our database schema, would reside entirely within Git repositories. This provides an immutable, auditable log of every change ever made.
The technology selection process was driven by our existing stack and the specific challenges of stateful system management.
- ScyllaDB: Chosen for its extreme performance and Cassandra compatibility, its distributed nature makes atomic, cluster-wide schema changes non-trivial. A schema update must be applied consistently across all nodes.
- GitOps with ArgoCD: We already used ArgoCD for our stateless Kubernetes deployments. Extending it to manage stateful migrations felt like a natural progression. Its reconciliation loop could ensure that the state of our database schema matched the desired state defined in Git.
- Consul: This was the critical missing piece. A simple CI job executing schema changes is prone to race conditions. If two pipelines for different schemas run concurrently, or if a Kubernetes pod running the migration restarts, we could end up with a corrupt or inconsistent schema. We needed a robust distributed locking mechanism. Using a table within ScyllaDB itself for locking felt like a circular dependency. Consul’s Key/Value store with its session-based locking feature provided a perfect, external coordination mechanism. It also served as our service discovery backend, allowing our migration tool to dynamically find the ScyllaDB cluster nodes.
The architecture centers around a custom migration runner, orchestrated by a CI/CD pipeline and synchronized by ArgoCD, using Consul as a distributed mutex. The entire flow is declarative and idempotent.
sequenceDiagram participant Dev participant GitRepo as Git Repo (Schemas) participant CI as CI Pipeline (GitLab) participant Registry as Container Registry participant ArgoCD participant K8s as Kubernetes Cluster participant Migrator as Migrator Job Pod participant Consul participant ScyllaDB Dev->>+GitRepo: git push (V003_add_index.cql) GitRepo->>CI: Trigger webhook CI->>CI: 1. Build Migrator Docker image CI->>+Registry: 2. Push image (app-migrator:sha-xyz) Registry-->>-CI: Push successful CI->>GitRepo: 3. Update K8s manifest with new image tag GitRepo-->>-CI: Commit successful CI-->>Dev: Pipeline finished ArgoCD->>GitRepo: Poll for changes ArgoCD->>ArgoCD: Detects image tag change in manifest ArgoCD->>+K8s: Sync: Apply updated Job manifest K8s->>K8s: Create Migrator Job Pod K8s-->>-ArgoCD: Sync successful Migrator->>+Consul: 1. Create session Consul-->>-Migrator: Session ID Migrator->>+Consul: 2. Acquire lock (scylla/migrations/lock) with session Consul-->>-Migrator: Lock acquired Migrator->>+ScyllaDB: 3. Connect to cluster Migrator->>ScyllaDB: 4. Read applied migrations from schema_version table ScyllaDB-->>Migrator: List of applied versions Migrator->>Migrator: 5. Compare with local files, find V003_... is new Migrator->>+ScyllaDB: 6. Execute V003_add_index.cql ScyllaDB-->>-Migrator: Execution successful Migrator->>+ScyllaDB: 7. Insert 'V003_add_index.cql' into schema_version table ScyllaDB-->>-Migrator: Insert successful Migrator->>+Consul: 8. Release lock (destroy session) Consul-->>-Migrator: Lock released K8s->>K8s: Pod completes successfully
Git Repository Structure and Schema Definition
The foundation of this system is a dedicated Git repository for database schemas. The structure must be rigid and predictable for the automation to parse it correctly. We adopted a structure similar to tools like Flyway or Liquibase.
# tree scylla-migrations
scylla-migrations/
├── .gitlab-ci.yml
├── Dockerfile
├── migrator/
│ ├── __main__.py
│ ├── requirements.txt
│ └── scripts/
│ └── my_app_keyspace/
│ ├── V001__create_users_table.cql
│ ├── V002__add_email_to_users.cql
│ └── V003__create_users_by_email_mv.cql
└── deploy/
└── k8s-job.yaml
Each keyspace gets its own directory. Migration files are prefixed with a version VNNN
, a double underscore, a description, and a .cql
extension. The migrator script will execute these files in lexicographical order. This is a critical convention; breaking it breaks the migration logic.
A typical migration file is a simple, idempotent CQL script.
V001__create_users_table.cql
:
CREATE TABLE IF NOT EXISTS my_app_keyspace.users (
user_id uuid,
username text,
created_at timestamp,
PRIMARY KEY (user_id)
);
V002__add_email_to_users.cql
:
ALTER TABLE my_app_keyspace.users ADD email text;
The use of IF NOT EXISTS
provides a basic level of idempotency, though our migration tracking table is the primary guard against re-running scripts.
The Core Migration Runner
This is a Python application responsible for the entire migration logic. It’s designed to run inside a Kubernetes Job, connect to Consul and ScyllaDB, perform the migration, and then terminate.
migrator/__main__.py
:
import os
import sys
import logging
import time
import uuid
from pathlib import Path
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
from cassandra.policies import DCAwareRoundRobinPolicy
import consul
# --- Basic Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# ScyllaDB Configuration from Environment Variables
SCYLLA_HOSTS = os.getenv("SCYLLA_HOSTS", "localhost").split(',')
SCYLLA_PORT = int(os.getenv("SCYLLA_PORT", 9042))
SCYLLA_USER = os.getenv("SCYLLA_USERNAME")
SCYLLA_PASSWORD = os.getenv("SCYLLA_PASSWORD")
SCYLLA_DC = os.getenv("SCYLLA_DATACENTER")
KEYSPACE = os.getenv("KEYSPACE_NAME")
MIGRATIONS_DIR = Path(os.getenv("MIGRATIONS_DIR", "/app/scripts"))
# Consul Configuration
CONSUL_HOST = os.getenv("CONSUL_HOST", "consul")
CONSUL_PORT = int(os.getenv("CONSUL_PORT", 8500))
CONSUL_LOCK_KEY = f"scylla-migrations/{KEYSPACE}/lock"
CONSUL_SESSION_TTL = "60s" # The lock will be auto-released if the pod dies
SCHEMA_VERSION_TABLE = "schema_version"
class MigrationError(Exception):
pass
class ScyllaMigrator:
def __init__(self):
self.consul_client = consul.Consul(host=CONSUL_HOST, port=CONSUL_PORT)
self.session_id = None
self.scylla_session = None
self.cluster = None
def _create_consul_session(self):
"""Creates a new session in Consul for holding the lock."""
session_name = f"scylla-migrator-{KEYSPACE}-{uuid.uuid4()}"
try:
self.session_id = self.consul_client.session.create(
name=session_name,
behavior='delete', # Destroy session and release lock on failure
ttl=CONSUL_SESSION_TTL
)
logging.info(f"Consul session '{self.session_id}' created.")
except Exception as e:
logging.error(f"Failed to create Consul session: {e}")
raise
def acquire_lock(self):
"""Acquires a distributed lock in Consul."""
self._create_consul_session()
logging.info(f"Attempting to acquire lock: {CONSUL_LOCK_KEY}")
# This is a blocking call. We will wait here until the lock is available.
# In a Kubernetes job, this is fine. The job will just run longer.
# A timeout could be added for more complex scenarios.
if not self.consul_client.kv.put(CONSUL_LOCK_KEY, 'locked', acquire=self.session_id):
raise MigrationError("Failed to acquire Consul lock. Another migration may be in progress.")
logging.info("Lock acquired successfully.")
def release_lock(self):
"""Releases the distributed lock by destroying the session."""
if self.session_id:
try:
self.consul_client.session.destroy(self.session_id)
logging.info(f"Released lock by destroying session '{self.session_id}'.")
self.session_id = None
except Exception as e:
# Log error but don't fail the entire process, as TTL will eventually clean it up.
logging.warning(f"Failed to cleanly destroy Consul session '{self.session_id}': {e}")
def _connect_scylla(self):
"""Establishes a connection to the ScyllaDB cluster."""
auth_provider = None
if SCYLLA_USER and SCYLLA_PASSWORD:
auth_provider = PlainTextAuthProvider(username=SCYLLA_USER, password=SCYLLA_PASSWORD)
# In a production setup, DCAwareRoundRobinPolicy is crucial.
policy = DCAwareRoundRobinPolicy(local_dc=SCYLLA_DC) if SCYLLA_DC else None
try:
self.cluster = Cluster(
contact_points=SCYLLA_HOSTS,
port=SCYLLA_PORT,
auth_provider=auth_provider,
load_balancing_policy=policy
)
self.scylla_session = self.cluster.connect()
# Ensure we are operating on the correct keyspace.
self.scylla_session.execute(f"USE {KEYSPACE}")
logging.info(f"Successfully connected to ScyllaDB cluster and using keyspace '{KEYSPACE}'.")
except Exception as e:
logging.error(f"Failed to connect to ScyllaDB: {e}")
raise
def _ensure_schema_version_table(self):
"""Creates the migration tracking table if it doesn't exist."""
query = f"""
CREATE TABLE IF NOT EXISTS {SCHEMA_VERSION_TABLE} (
version text,
description text,
script_name text,
installed_on timestamp,
execution_time_ms int,
success boolean,
PRIMARY KEY (version)
)
"""
self.scylla_session.execute(query)
logging.info(f"Ensured schema version table '{SCHEMA_VERSION_TABLE}' exists.")
def _get_applied_migrations(self):
"""Fetches the list of already applied migration versions."""
query = f"SELECT version FROM {SCHEMA_VERSION_TABLE}"
rows = self.scylla_session.execute(query)
applied = {row.version for row in rows}
logging.info(f"Found {len(applied)} applied migrations.")
return applied
def _log_migration(self, version, description, script, execution_time, success):
"""Logs the result of a migration script execution to the tracking table."""
query = f"""
INSERT INTO {SCHEMA_VERSION_TABLE} (version, description, script_name, installed_on, execution_time_ms, success)
VALUES (%s, %s, %s, toTimestamp(now()), %s, %s)
"""
self.scylla_session.execute(query, (version, description, script, int(execution_time * 1000), success))
def run_migrations(self):
"""The main migration logic loop."""
self._connect_scylla()
self._ensure_schema_version_table()
applied_versions = self._get_applied_migrations()
keyspace_dir = MIGRATIONS_DIR / KEYSPACE
if not keyspace_dir.is_dir():
logging.warning(f"Migrations directory for keyspace '{KEYSPACE}' not found. Nothing to do.")
return
migration_files = sorted(p for p in keyspace_dir.glob("V*.cql") if p.is_file())
new_migrations_found = False
for script_path in migration_files:
try:
# Filename format: VNNN__description.cql
version = script_path.name.split("__")[0]
description = script_path.name.split("__")[1].replace(".cql", "").replace("_", " ")
except IndexError:
logging.warning(f"Skipping malformed file name: {script_path.name}")
continue
if version in applied_versions:
continue
new_migrations_found = True
logging.info(f"Applying new migration: {script_path.name}")
with open(script_path, 'r') as f:
cql_script = f.read()
# ScyllaDB/Cassandra drivers do not support multiple statements in one execute call.
# We must split them. A common mistake is to assume a single execute works.
statements = [s.strip() for s in cql_script.split(';') if s.strip()]
start_time = time.time()
try:
for statement in statements:
logging.info(f"Executing: {statement[:100]}...") # Log truncated statement
self.scylla_session.execute(SimpleStatement(statement))
execution_time = time.time() - start_time
self._log_migration(version, description, script_path.name, execution_time, True)
logging.info(f"Successfully applied {script_path.name} in {execution_time:.2f} seconds.")
except Exception as e:
execution_time = time.time() - start_time
self._log_migration(version, description, script_path.name, execution_time, False)
logging.error(f"FAILED to apply migration {script_path.name}: {e}")
# This is a critical failure. We stop immediately.
raise MigrationError(f"Migration {script_path.name} failed.") from e
if not new_migrations_found:
logging.info("Schema is already up-to-date.")
def execute(self):
"""Main execution wrapper with lock handling and cleanup."""
try:
self.acquire_lock()
self.run_migrations()
except Exception as e:
logging.critical(f"A critical error occurred during the migration process: {e}")
sys.exit(1) # Ensure the Kubernetes Job is marked as failed
finally:
self.release_lock()
if self.cluster:
self.cluster.shutdown()
logging.info("Migration process finished.")
if __name__ == "__main__":
migrator = ScyllaMigrator()
migrator.execute()
This script is robust. It uses environment variables for configuration, implements a Consul-based distributed lock with session TTL to prevent stale locks, creates its own tracking table, and logs every action. The failure of any single CQL statement within a migration file halts the entire process, preventing partial schema updates.
Containerization and CI/CD Pipeline
The migrator script is packaged into a lightweight Docker container.
Dockerfile
:
FROM python:3.9-slim
WORKDIR /app
COPY migrator/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY migrator/ .
# The entrypoint is the script itself.
# Configuration will be passed via environment variables in the K8s Job.
CMD ["python", "-u", "./__main__.py"]
The CI pipeline’s job is to build this image, tag it with the Git commit SHA for traceability, push it to our registry, and then update the Kubernetes Job manifest in Git to use this new image tag. This final step is what triggers ArgoCD.
.gitlab-ci.yml
:
stages:
- build-and-deploy
variables:
# Using commit SHA for unique, traceable image tags
IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
build-migrator:
stage: build-and-deploy
image: docker:20.10.16
services:
- docker:20.10.16-dind
script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker build -t $IMAGE_TAG .
- docker push $IMAGE_TAG
# This section updates the K8s manifest in the same repo.
# In a more complex setup, this might be a separate git repo.
# Using 'sed' is simple but tools like 'kustomize' or 'yq' are better for production.
- "sed -i 's|image: .*|image: '$IMAGE_TAG'|g' deploy/k8s-job.yaml"
# Commit the change back to the repo to be picked up by ArgoCD
- git config --global user.email "ci-[email protected]"
- git config --global user.name "CI Pipeline"
- git add deploy/k8s-job.yaml
- 'git commit -m "Update migrator image to $CI_COMMIT_SHORT_SHA [skip ci]"'
- git push origin HEAD:$CI_COMMIT_REF_NAME
A key detail here is [skip ci]
in the commit message to prevent a recursive loop of pipeline triggers.
Kubernetes and ArgoCD Integration
The migration is executed as a Kubernetes Job
. This is ideal because it’s a run-to-completion workload. If it fails, Kubernetes can be configured to retry it, but our lock mechanism ensures only one instance runs at a time.
deploy/k8s-job.yaml
:
apiVersion: batch/v1
kind: Job
metadata:
name: scylla-migrator-my-app
namespace: database-ops
spec:
template:
spec:
containers:
- name: migrator
# This image tag will be dynamically updated by our CI pipeline
image: my-registry/scylla-migrator:initial-tag
env:
- name: KEYSPACE_NAME
value: "my_app_keyspace"
- name: MIGRATIONS_DIR
value: "/app/scripts"
- name: SCYLLA_HOSTS
# Using Consul's DNS for service discovery
value: "scylla.service.consul"
- name: SCYLLA_DATACENTER
value: "dc1"
- name: CONSUL_HOST
value: "consul.service.consul"
- name: SCYLLA_USERNAME
valueFrom:
secretKeyRef:
name: scylla-credentials
key: username
- name: SCYLLA_PASSWORD
valueFrom:
secretKeyRef:
name: scylla-credentials
key: password
restartPolicy: Never # We want the job to fail on error, not restart indefinitely
backoffLimit: 2 # Retry twice on failure before marking the Job as failed
Finally, an ArgoCD Application manifest points to our scylla-migrations
repository. ArgoCD monitors this repository. When the CI pipeline pushes a change to k8s-job.yaml
, ArgoCD detects the diff and applies it to the cluster, triggering a new Job run.
This closes the loop, creating a fully automated, auditable system. A developer simply creates a PR with a new .cql
file. Once reviewed and merged, the pipeline takes over, ensuring the change is applied safely and consistently across all environments, with the Consul lock preventing any chance of a race condition. The history in the schema_version
table, combined with the Git log, provides a complete and unambiguous record of the database’s evolution.
The most significant limitation of this approach is its handling of destructive changes or complex data migrations that require a rollback strategy. This pipeline is forward-only. Implementing automated rollbacks for database schemas is an order of magnitude more complex, as it often involves data transformation logic that cannot be generalized. A V002_rollback.cql
script would need to be written and tested with extreme care. Furthermore, for a system with dozens of independent teams working on separate keyspaces, the single global lock could become a bottleneck. A future iteration might involve a more granular locking key, such as scylla-migrations/{keyspace}/lock
, allowing migrations for different keyspaces to run in parallel.