The operational burden of managing schema and configuration-driven data changes across hundreds of isolated SQL Server tenant databases had become untenable. Each deployment was a high-risk, manually-intensive process involving checklists, late-night maintenance windows, and a significant chance of human error. Our initial reliance on standard database migration tools hit a wall; they lacked the nuanced control needed for phased rollouts. A simple migrate up
command across all tenants simultaneously was a non-starter. We needed a system that supported canary releases—applying a change to 10% of tenants, observing the impact, and then proceeding—all driven from a single source of truth: a Git repository.
This led to the design of an event-driven system to orchestrate these rollouts asynchronously. The core concept was to decouple the intent of a change (a Git commit) from its execution against the databases. A synchronous API call from a CI pipeline to a deployment service would be fragile, prone to timeouts, and offer no observability into the process. The only sane path forward was an asynchronous, message-based architecture.
The technology selection was driven by production realities. We needed battle-tested components that our team could manage.
- Go: For the backend services. Its performance, straightforward concurrency model, and static typing make it an excellent choice for building reliable, long-running infrastructure daemons.
- Git: The canonical source of truth. All schema changes, feature flag configurations, and data migrations would be defined as version-controlled files.
- Nacos: To serve as the dynamic configuration center. While Git holds the master record, Nacos acts as the live, broadcast medium. A CI/CD pipeline pushes the validated configuration from Git to Nacos, which then notifies listening services of the change instantly. We chose it for its simplicity in setting up watchers compared to some alternatives.
- Kafka: The asynchronous backbone. When a configuration change is detected, a detailed event is published to a Kafka topic. This provides durability, backpressure handling, and the ability to replay events. It also decouples the Nacos watcher from the actual database deployment logic.
- SQL Server: The incumbent database system we had to integrate with, not replace. The challenge was to impose a modern DevOps workflow on a traditional RDBMS.
The architecture materializes as a multi-stage pipeline, visualized below.
graph TD A[Developer: git push] --> B{CI/CD Pipeline}; B -- 1. Validate & Package --> C[Nacos Config Center]; C -- 2. Push Config Change --> D(Go Nacos Watcher); D -- 3. Parse & Create Event --> E(Kafka Topic: `db.rollout.events`); E -- 4. Consume Event --> F(Go Deployment Consumer); F -- 5. Fetch Tenant List --> G[SQL Server Master DB]; F -- 6. Apply Change Iteratively --> H[SQL Server Tenant DBs]; F -- 7. Log Status --> I[Execution Log Table]; subgraph "Source of Truth" A end subgraph "Control Plane" B C end subgraph "Event Backbone" D E end subgraph "Execution Plane" F G H I end
The process begins when a developer commits a new migration file. The CI pipeline is responsible for linting the SQL and the configuration manifest, then pushing the manifest’s content to a specific dataId
in Nacos. This is the trigger for the entire automated workflow.
Defining the Change Manifest
The first step was standardizing the change definition. We settled on a YAML format stored in Git, which is then serialized to JSON for storage in Nacos. A common mistake is to just dump raw SQL into a config value. This is unmanageable. We needed metadata to control the rollout.
migrations/v1.0.2/add_user_prefs_table.yml
:
version: "1.0.2"
description: "Adds user preferences table and enables dark mode feature flag."
author: "[email protected]"
changeType: "SCHEMA_AND_DATA" # Can be SCHEMA, DATA, or FEATURE_FLAG
rolloutStrategy:
type: "canary"
percentage: 10 # Roll out to 10% of tenants first
# For full rollout, type would be "all"
# For specific tenants, type would be "list" with a tenants array
changesets:
- id: "2023102701_create_user_preferences"
sqlUp: |
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='user_preferences' and xtype='U')
CREATE TABLE user_preferences (
user_id INT PRIMARY KEY,
theme VARCHAR(20) DEFAULT 'light' NOT NULL,
notifications_enabled BIT DEFAULT 1 NOT NULL,
updated_at DATETIME2 DEFAULT GETUTCDATE()
);
sqlDown: |
DROP TABLE IF EXISTS user_preferences;
- id: "2023102702_insert_dark_mode_flag"
sqlUp: |
-- This is just an example of a data change
-- In a real scenario, this might be a feature flag table update
INSERT INTO feature_flags (flag_name, is_enabled) VALUES ('dark_mode_feature', 1);
sqlDown: |
DELETE FROM feature_flags WHERE flag_name = 'dark_mode_feature';
This structure provides the necessary information for the consumer to act intelligently. The rolloutStrategy
is key to enabling phased deployments. The changesets
array allows for multiple, ordered SQL scripts within a single versioned change.
The Nacos Watcher Service
This Go service has one job: listen for updates to our migration configuration in Nacos and translate them into structured Kafka events. Its implementation requires careful handling of the Nacos client lifecycle and robust error management.
watcher/main.go
:
package main
import (
"context"
"encoding/json"
"log"
"os"
"time"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/segmentio/kafka-go"
"gopkg.in/yaml.v2"
)
const (
nacosDataID = "db-migration-manifest"
nacosGroup = "DEFAULT_GROUP"
kafkaTopic = "db.rollout.events"
)
// Represents the YAML structure from Git
type ChangeManifest struct {
Version string `yaml:"version"`
Description string `yaml:"description"`
Author string `yaml:"author"`
ChangeType string `yaml:"changeType"`
RolloutStrategy struct {
Type string `yaml:"type"`
Percentage int `yaml:"percentage"`
} `yaml:"rolloutStrategy"`
Changesets []struct {
ID string `yaml:"id"`
SqlUp string `yaml:"sqlUp"`
SqlDown string `yaml:"sqlDown"`
} `yaml:"changesets"`
}
// Event structure for Kafka
type RolloutEvent struct {
EventID string `json:"eventId"`
Manifest ChangeManifest `json:"manifest"`
TriggeredAt time.Time `json:"triggeredAt"`
Source string `json:"source"`
}
func main() {
nacosEndpoint := os.Getenv("NACOS_ENDPOINT")
nacosPortStr := os.Getenv("NACOS_PORT")
kafkaBroker := os.Getenv("KAFKA_BROKER")
// The pitfall here is not configuring the Nacos client correctly.
// Logging and caching directories are crucial for production stability.
sc := []constant.ServerConfig{
*constant.NewServerConfig(nacosEndpoint, 8848),
}
cc := constant.NewClientConfig(
constant.WithNamespaceId(""), // A specific namespace is recommended for production
constant.WithTimeoutMs(5000),
constant.WithNotLoadCacheAtStart(true),
constant.WithLogDir("/tmp/nacos/log"),
constant.WithCacheDir("/tmp/nacos/cache"),
constant.WithLogLevel("info"),
)
configClient, err := clients.NewConfigClient(
vo.NacosClientParam{
ClientConfig: cc,
ServerConfigs: sc,
},
)
if err != nil {
log.Fatalf("Failed to create Nacos client: %v", err)
}
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaBroker),
Topic: kafkaTopic,
Balancer: &kafka.LeastBytes{},
// Production config should include more settings for durability.
RequiredAcks: kafka.RequireAll,
MaxAttempts: 10,
WriteTimeout: 10 * time.Second,
}
defer kafkaWriter.Close()
// The onChange function is the core logic.
onChange := func(namespace, group, dataId, data string) {
log.Printf("Configuration changed for dataId: %s in group: %s", dataId, group)
var manifest ChangeManifest
// The data from Nacos is the raw content of the YAML file.
if err := yaml.Unmarshal([]byte(data), &manifest); err != nil {
log.Printf("ERROR: Failed to unmarshal manifest YAML: %v", err)
// In a real system, this error would be sent to an alert system.
return
}
event := RolloutEvent{
EventID: "evt_" + manifest.Version, // Simple ID for demo purposes
Manifest: manifest,
TriggeredAt: time.Now().UTC(),
Source: "NacosWatcher",
}
eventBytes, err := json.Marshal(event)
if err != nil {
log.Printf("ERROR: Failed to marshal rollout event: %v", err)
return
}
// Use the manifest version as the Kafka message key.
// This ensures all events related to the same version go to the same partition,
// preserving order if needed.
err = kafkaWriter.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(manifest.Version),
Value: eventBytes,
},
)
if err != nil {
log.Printf("ERROR: Failed to write message to Kafka: %v", err)
} else {
log.Printf("Successfully published event for version %s to Kafka", manifest.Version)
}
}
err = configClient.ListenConfig(vo.ConfigParam{
DataId: nacosDataID,
Group: nacosGroup,
OnChange: onChange,
})
if err != nil {
log.Fatalf("Failed to listen for Nacos config changes: %v", err)
}
log.Println("Nacos watcher started. Waiting for configuration changes...")
// Block forever
select {}
}
This service is intentionally simple. It avoids any business logic related to tenants or databases. Its sole responsibility is to act as a bridge, ensuring that a change in the control plane (Nacos) reliably generates an event on the data plane’s message bus (Kafka).
The Deployment Consumer
This is where the complex work happens. This Go service consumes the RolloutEvent
from Kafka and executes the deployment strategy. It needs a connection to a master database to get the list of tenants and connections to the tenant databases themselves to apply the changes.
A critical design choice here is idempotency. Running the same migration script twice must not cause an error or data corruption. We achieve this by creating a schema migrations table in each tenant database. Before executing a changeset, the consumer checks if that changeset’s ID has already been logged in this table.
consumer/db/manager.go
:
package db
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/denisenkom/go-mssqldb" // SQL Server driver
)
// DBManager handles all database interactions.
type DBManager struct {
masterDB *sql.DB
}
// Tenant represents a single customer database.
type Tenant struct {
ID int
Name string
ConnectionString string
}
func NewDBManager(masterConnStr string) (*DBManager, error) {
db, err := sql.Open("sqlserver", masterConnStr)
if err != nil {
return nil, fmt.Errorf("failed to connect to master DB: %w", err)
}
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(time.Hour)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("failed to ping master DB: %w", err)
}
return &DBManager{masterDB: db}, nil
}
// GetTenantsForRollout retrieves the list of tenants based on the strategy.
func (m *DBManager) GetTenantsForRollout(strategyType string, percentage int) ([]Tenant, error) {
// A common mistake is to do this calculation in the service.
// Pushing it to the database is far more efficient.
var query string
switch strategyType {
case "all":
query = "SELECT TenantID, Name, ConnectionString FROM Tenants WHERE IsActive = 1"
case "canary":
// TABLESAMPLE is a non-standard but efficient way in SQL Server
// to get an approximate percentage of rows. For precise percentage,
// use `NTILE` or other window functions. This is a pragmatic choice.
query = fmt.Sprintf("SELECT TenantID, Name, ConnectionString FROM Tenants TABLESAMPLE (%d PERCENT) WHERE IsActive = 1", percentage)
default:
return nil, fmt.Errorf("unsupported rollout strategy: %s", strategyType)
}
rows, err := m.masterDB.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to query tenants: %w", err)
}
defer rows.Close()
var tenants []Tenant
for rows.Next() {
var t Tenant
if err := rows.Scan(&t.ID, &t.Name, &t.ConnectionString); err != nil {
return nil, fmt.Errorf("failed to scan tenant row: %w", err)
}
tenants = append(tenants, t)
}
return tenants, nil
}
// ApplyChangeset executes a single changeset against a tenant DB idempotently.
func (m *DBManager) ApplyChangeset(ctx context.Context, tenant Tenant, changesetID, sqlUp string) error {
tenantDB, err := sql.Open("sqlserver", tenant.ConnectionString)
if err != nil {
return fmt.Errorf("failed to connect to tenant %s: %w", tenant.Name, err)
}
defer tenantDB.Close()
// Start a transaction. The entire process of checking, executing, and logging
// must be atomic for a given changeset.
tx, err := tenantDB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction for tenant %s: %w", tenant.Name, err)
}
// Defer a rollback. If Commit is successful, this is a no-op.
defer tx.Rollback()
// 1. Ensure the migrations table exists.
initSchemaSQL := `
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='__schema_migrations' and xtype='U')
CREATE TABLE __schema_migrations (
id VARCHAR(255) PRIMARY KEY,
applied_at DATETIME2 NOT NULL DEFAULT GETUTCDATE()
);`
if _, err := tx.ExecContext(ctx, initSchemaSQL); err != nil {
return fmt.Errorf("failed to ensure migrations table for tenant %s: %w", tenant.Name, err)
}
// 2. Check if this changeset has already been applied. This is the core of idempotency.
var existingID string
err = tx.QueryRowContext(ctx, "SELECT id FROM __schema_migrations WHERE id = @p1", changesetID).Scan(&existingID)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to check existing migration for tenant %s: %w", tenant.Name, err)
}
if err == nil {
log.Printf("Changeset %s already applied to tenant %s. Skipping.", changesetID, tenant.Name)
return nil // Not an error, just already done.
}
// 3. Execute the actual migration script.
if _, err := tx.ExecContext(ctx, sqlUp); err != nil {
// The error here will include line numbers if the SQL is invalid, which is crucial for debugging.
return fmt.Errorf("failed to execute sqlUp for changeset %s on tenant %s: %w", changesetID, tenant.Name, err)
}
// 4. Log the successful application in the migrations table.
logSQL := "INSERT INTO __schema_migrations (id) VALUES (@p1)"
if _, err := tx.ExecContext(ctx, logSQL, changesetID); err != nil {
return fmt.Errorf("failed to log migration %s for tenant %s: %w", changesetID, tenant.Name, err)
}
// 5. Commit the transaction.
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction for tenant %s: %w", tenant.Name, err)
}
log.Printf("Successfully applied changeset %s to tenant %s", changesetID, tenant.Name)
return nil
}
func (m *DBManager) Close() {
m.masterDB.Close()
}
consumer/main.go
:
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"syscall"
"github.com/segmentio/kafka-go"
"github.com/my-org/db-rollout-engine/consumer/db" // Reference to the package above
)
const (
kafkaTopic = "db.rollout.events"
kafkaGroupID = "db-deployment-consumer-group"
)
// Event structures match the ones in the watcher.
type ChangeManifest struct {
// ... (same struct as in watcher)
}
type RolloutEvent struct {
// ... (same struct as in watcher)
}
func main() {
kafkaBroker := os.Getenv("KAFKA_BROKER")
masterDBConnStr := os.Getenv("MASTER_DB_CONN_STR")
// Set up Kafka reader. In production, you'd use a ReaderConfig struct.
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
GroupID: kafkaGroupID,
Topic: kafkaTopic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
dbManager, err := db.NewDBManager(masterDBConnStr)
if err != nil {
log.Fatalf("Failed to initialize DB manager: %v", err)
}
defer dbManager.Close()
log.Println("Deployment consumer started. Waiting for events...")
// Graceful shutdown handling.
ctx, cancel := context.WithCancel(context.Background())
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigchan
log.Println("Shutdown signal received, cancelling context...")
cancel()
}()
for {
// Using FetchMessage with context allows for graceful shutdown.
m, err := r.FetchMessage(ctx)
if err != nil {
// Context canceled is the expected error on shutdown.
if ctx.Err() != nil {
break
}
log.Printf("ERROR: could not fetch message: %v", err)
continue
}
var event RolloutEvent
if err := json.Unmarshal(m.Value, &event); err != nil {
log.Printf("ERROR: could not unmarshal event value: %v. Moving to DLQ is advised.", err)
// A real system would have DLQ (Dead Letter Queue) logic here.
r.CommitMessages(ctx, m) // Commit to avoid reprocessing a poison pill message.
continue
}
log.Printf("Processing event for version %s", event.Manifest.Version)
// The core logic starts here.
processEvent(dbManager, &event)
// Commit the message offset to Kafka.
if err := r.CommitMessages(ctx, m); err != nil {
log.Printf("ERROR: failed to commit messages: %v", err)
}
}
log.Println("Consumer shut down cleanly.")
}
func processEvent(dbm *db.DBManager, event *RolloutEvent) {
strategy := event.Manifest.RolloutStrategy
tenants, err := dbm.GetTenantsForRollout(strategy.Type, strategy.Percentage)
if err != nil {
log.Printf("ERROR: Failed to get tenants for version %s: %v", event.Manifest.Version, err)
return
}
log.Printf("Found %d tenants for rollout strategy '%s' with percentage %d", len(tenants), strategy.Type, strategy.Percentage)
// Process each tenant sequentially for simplicity.
// A more advanced version could process them in parallel with a worker pool.
// However, this can risk overwhelming the database server. A pragmatic start is sequential.
for _, tenant := range tenants {
log.Printf("Applying changes to tenant: %s (ID: %d)", tenant.Name, tenant.ID)
for _, changeset := range event.Manifest.Changesets {
// A timeout per changeset application is a good production practice.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
err := dbm.ApplyChangeset(ctx, tenant, changeset.ID, changeset.SqlUp)
if err != nil {
log.Printf("CRITICAL: Failed to apply changeset %s to tenant %s: %v", changeset.ID, tenant.Name, err)
// Here, we decide to stop processing for this tenant but continue with others.
// This is a business decision. An alternative is to stop the entire rollout.
cancel()
break
}
cancel()
}
}
log.Printf("Finished processing event for version %s", event.Manifest.Version)
}
The consumer service is the heart of the engine. The separation of concerns in the db.Manager
keeps the main processing loop clean. The logic for handling failures is critical: a failure for one tenant should not stop the entire system, but it must be logged aggressively for on-call engineers to investigate. The message is committed to Kafka only after the entire tenant batch for that event is processed, which provides at-least-once delivery semantics.
Limitations and Future Iterations
This system, while functional, is a starting point. Its most significant limitation is the lack of a user-facing control plane for observability. Engineers currently have to rely on structured logs and querying the __schema_migrations
tables to understand the state of a rollout. A future iteration would involve building a simple UI that consumes an audit event stream from Kafka to visualize which tenants have which versions applied.
The rollback strategy is also primitive. It relies on the sqlDown
scripts in the manifest, but there is no automated process to trigger them. A true rollback feature would require a new type of Kafka event (RollbackEvent
) and significantly more complex logic in the consumer to handle applying the sqlDown
scripts in the correct reverse order.
Finally, the canary strategy is naive. Selecting a random percentage of tenants is a blunt instrument. A more sophisticated implementation would integrate with our monitoring systems. It would deploy to a small, fixed set of canary tenants, then monitor key application metrics (error rates, latency) for a predefined period. If the metrics remain healthy, the system would automatically promote the change by generating a new RolloutEvent
with a type: "all"
strategy. This closes the loop, moving from simple automation to intelligent, metric-driven deployments.