The core OLTP system, built on SQL Server, was the untouchable monolith, reliably handling millions of product inventory transactions daily. The problem was search. Standard LIKE '%...%'
queries initiated by the sales team were crippling the primary database during peak hours, and the built-in full-text search capabilities, while better, failed to understand user intent. A search for “lightweight hiking shell” would miss a “waterproof windbreaker for trekking” even if it was the superior product. We needed semantic search, but a direct modification of the legacy application or database schema was off the table. Any solution had to be completely external and non-intrusive.
Our initial concept was a straightforward nightly ETL batch job. We would dump the relevant product tables, process them in a separate environment, generate vector embeddings, and load them into a vector database. This was quickly dismissed. The business operates in a fast-moving consumer goods sector; inventory and pricing change constantly. Data staleness of even an hour was unacceptable, as it could lead to sales staff promising out-of-stock items. A batch failure would mean the search index was a full day out of date. The only viable path was real-time replication.
This led to the architecture that we ultimately implemented: a Change Data Capture (CDC) pipeline. We decided on Debezium for SQL Server to capture row-level changes with minimal impact on the source database. These changes would be published as events to an Apache Kafka topic, providing a durable, scalable buffer. A Python-based consumer service would process these events, using NumPy for numerical feature preparation before generating embeddings, and finally, upserting the corresponding vector records into a Weaviate instance. This effectively creates a highly specialized, eventually consistent read replica of our SQL Server data, optimized for one specific task: semantic search.
The following is a build log of that system, detailing the configuration, code, and the practical pitfalls encountered during its construction.
Phase 1: Enabling the Data Flow from the Source
The foundation of this architecture is reliably capturing every INSERT
, UPDATE
, and DELETE
from the SQL Server Products
table. In a real-world project, enabling CDC is a task for a DBA and requires careful consideration of transaction log growth. The initial temptation to use database triggers must be resisted; they execute within the source transaction’s scope and are a notorious source of deadlocks and performance degradation in high-throughput systems.
First, CDC must be enabled on the database itself, and then on the specific tables we need to track.
-- This must be run by a member of the sysadmin fixed server role.
-- Connect to the target SQL Server instance and run these commands.
-- 1. Enable CDC for the database 'InventoryDB'
USE InventoryDB;
GO
EXEC sys.sp_cdc_enable_db;
GO
-- 2. Enable CDC for the 'dbo.Products' table
-- We are capturing all columns. For production, it's better to specify a column list.
-- The role_name=NULL makes the captured data accessible to all users with SELECT permissions on the capture instance tables.
-- In a locked-down environment, you would specify a gating role name.
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Products',
@role_name = NULL,
@supports_net_changes = 1;
GO
-- Verification (Optional): Check that the capture jobs are created and running in SQL Server Agent
SELECT name, is_enabled FROM msdb.dbo.sysjobs WHERE category_id = 16;
-- You should see jobs like 'cdc.InventoryDB_capture' and 'cdc.InventoryDB_cleanup'
With CDC enabled, the next step is deploying the infrastructure to host the Debezium connector. We opted for a Docker-based deployment for portability and ease of management.
Here is the docker-compose.yml
file that orchestrates the necessary components: Zookeeper, Kafka, and Kafka Connect (which hosts the Debezium connector).
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.1
container_name: kafka-connect
depends_on:
- kafka
- zookeeper
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
# Debezium requires the Microsoft JDBC driver for SQL Server.
# The debezium/connect image does not include it by default.
# You must mount it into the container's plugin path.
volumes:
- ./debezium-plugins:/kafka/connect/debezium-connector-sqlserver
A common mistake is forgetting the JDBC driver. Debezium’s connect
image is generic. You must download the Microsoft JDBC Driver for SQL Server (mssql-jdbc-*.jar
) and place it in a local directory (e.g., ./debezium-plugins
) that is then mounted into the container as shown in the volumes
section.
With the infrastructure running (docker-compose up -d
), we register the SQL Server connector via Kafka Connect’s REST API. The configuration JSON is critical.
// sqlserver-connector-config.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "your_sql_server_host",
"database.port": "1433",
"database.user": "debezium_user",
"database.password": "your_password",
"database.dbname": "InventoryDB",
"database.server.name": "sql-server-01",
"table.include.list": "dbo.Products",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"snapshot.mode": "initial",
"include.schema.changes": "false",
"tombstones.on.delete": "true",
"decimal.handling.mode": "double"
}
}
To register it, use a simple curl
command:curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-connector-config.json
Key configurations to note:
-
database.server.name
: This is a logical name that becomes the prefix for all Kafka topics created by this connector. Here, events for theProducts
table will go to thesql-server-01.dbo.Products
topic. -
snapshot.mode
:initial
tells Debezium to perform a consistent snapshot of the entiredbo.Products
table first, and then transition to streaming live changes. This is essential for populating the Weaviate index for the first time. -
tombstones.on.delete
:true
is vital. When a row is deleted in SQL Server, Debezium will publish a “tombstone” message (a message with a null value) to the Kafka topic. Our consumer must be ableto interpret this to issue a corresponding delete in Weaviate.
The overall data flow from source to message bus can be visualized as follows:
graph TD subgraph "SQL Server Instance" A[OLTP Transactions] --> B(Products Table); B -- Log Reader --> C(Transaction Log); end subgraph "Kafka Connect Cluster" D[Debezium SQL Server Connector] -- Reads --> C; end subgraph "Kafka Cluster" E(Kafka Topic: sql-server-01.dbo.Products); end D -- Publishes Events --> E;
Phase 2: The Resilient Python Consumer
The consumer is the heart of the system. It must be robust enough to handle network failures, malformed messages, and backpressure from downstream services (like the embedding API or Weaviate itself). We chose Python with the confluent-kafka-python
library for its reliability.
Here is the complete, runnable consumer service. It’s structured to be configurable via environment variables and includes basic logging and error handling.
# consumer_service.py
import os
import json
import logging
import sys
import time
from uuid import UUID
import numpy as np
import weaviate
from confluent_kafka import Consumer, KafkaError, KafkaException
# --- Configuration ---
# A pragmatic approach for configuration is to use environment variables.
KAFKA_BROKER = os.environ.get("KAFKA_BROKER", "localhost:29092")
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC", "sql-server-01.dbo.Products")
KAFKA_GROUP_ID = os.environ.get("KAFKA_GROUP_ID", "weaviate_indexer_group")
WEAVIATE_URL = os.environ.get("WEAVIATE_URL", "http://localhost:8080")
WEAVIATE_CLASS_NAME = "Product"
# --- Logging Setup ---
# Production-grade logging should be structured (e.g., JSON) and sent to a central collector.
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stdout)
# --- Weaviate Client Setup ---
# This client should be long-lived. Error handling for connection is critical.
try:
weaviate_client = weaviate.Client(WEAVIATE_URL)
logging.info(f"Successfully connected to Weaviate at {WEAVIATE_URL}")
except Exception as e:
logging.critical(f"Failed to connect to Weaviate: {e}")
sys.exit(1)
# --- Placeholder for Embedding Model ---
# In a real system, this would be a client for a service like OpenAI, Cohere,
# or a self-hosted SentenceTransformer model.
# The key is to handle API errors, rate limits, and retries.
def get_embedding(text: str) -> list[float]:
"""Generates a vector embedding for the given text."""
# This is a mock implementation. Replace with your actual embedding logic.
# For demonstration, we create a deterministic vector based on text length.
# A real model would provide semantically meaningful vectors.
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2') # This should be loaded once, not per call
return model.encode(text).tolist()
# --- Data Transformation with NumPy ---
def prepare_features(payload: dict) -> tuple[str, np.ndarray]:
"""
Extracts and transforms features from the Debezium payload using NumPy.
Returns a text string for embedding and a NumPy array for storage.
"""
# A common pitfall is assuming all fields are always present.
# Use .get() with defaults for robustness.
product_name = payload.get('Name', '')
description = payload.get('Description', '')
# Create the text blob for semantic embedding
text_for_embedding = f"Product: {product_name}. Description: {description}"
# Use NumPy for numerical feature engineering.
# Example: Combine price and weight with a logarithmic scale for price,
# as price distributions are often skewed.
price = float(payload.get('Price', 0.0))
weight_kg = float(payload.get('WeightKG', 0.0))
stock_count = int(payload.get('StockCount', 0))
# np.log1p is more numerically stable than np.log for values near 0.
log_price = np.log1p(price)
# This creates a small numerical feature vector that can be stored alongside
# the main text embedding in Weaviate for hybrid search.
numerical_features = np.array([log_price, weight_kg, stock_count], dtype=np.float32)
return text_for_embedding, numerical_features
# --- Weaviate Interaction Logic ---
def get_weaviate_uuid(primary_key: int) -> str:
"""
Deterministically generates a Weaviate-compatible UUID from a SQL Server integer PK.
This is CRITICAL for ensuring updates and deletes target the correct object.
Weaviate requires a standard UUID format.
"""
# Using a fixed namespace allows this to be deterministic.
namespace = UUID("c3ed54c1-4357-4258-a459-2467b79426f0")
return str(UUID(bytes=namespace.bytes + primary_key.to_bytes(16, 'big')))
def handle_weaviate_upsert(record_id: int, payload: dict):
"""Handles creating or updating a record in Weaviate."""
try:
text_to_embed, numerical_features = prepare_features(payload)
vector = get_embedding(text_to_embed)
weaviate_uuid = get_weaviate_uuid(record_id)
data_object = {
"sql_id": record_id,
"name": payload.get('Name'),
"price": payload.get('Price'),
"stock_count": payload.get('StockCount'),
"numerical_features": numerical_features.tolist() # Send as list
}
# Weaviate's `replace` method handles create-or-update logic if the UUID exists.
# This is more robust than checking existence first.
weaviate_client.data_object.replace(
data_object=data_object,
class_name=WEAVIATE_CLASS_NAME,
uuid=weaviate_uuid,
vector=vector
)
logging.info(f"UPSERT successful for SQL ID {record_id} (UUID: {weaviate_uuid})")
except Exception as e:
logging.error(f"Failed to process UPSERT for record ID {record_id}: {e}")
# In production, this error should be sent to a dead-letter queue for later inspection.
def handle_weaviate_delete(record_id: int):
"""Handles deleting a record from Weaviate."""
try:
weaviate_uuid = get_weaviate_uuid(record_id)
weaviate_client.data_object.delete(
uuid=weaviate_uuid,
class_name=WEAVIATE_CLASS_NAME
)
logging.info(f"DELETE successful for SQL ID {record_id} (UUID: {weaviate_uuid})")
except weaviate.exceptions.UnexpectedStatusCodeException as e:
if e.status_code == 404:
logging.warning(f"Attempted to delete non-existent object for SQL ID {record_id}. Likely a race condition or replay. Ignoring.")
else:
logging.error(f"Failed to process DELETE for record ID {record_id}: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during DELETE for record ID {record_id}: {e}")
# --- Main Consumer Loop ---
def consume_events():
consumer_conf = {
'bootstrap.servers': KAFKA_BROKER,
'group.id': KAFKA_GROUP_ID,
'auto.offset.reset': 'earliest', # Start from the beginning on first run
'enable.auto.commit': False # We will commit offsets manually for atomicity
}
consumer = Consumer(consumer_conf)
try:
consumer.subscribe([KAFKA_TOPIC])
logging.info(f"Consumer subscribed to topic: {KAFKA_TOPIC}")
while True:
# Poll for new messages. Timeout is important to prevent a busy-wait loop.
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event, not an error.
continue
else:
raise KafkaException(msg.error())
# Message received. Process it.
key = json.loads(msg.key().decode('utf-8'))
primary_key = key['payload']['Id'] # Assuming 'Id' is the PK column name
if msg.value() is None:
# This is a tombstone message for a DELETE operation.
handle_weaviate_delete(primary_key)
else:
value = json.loads(msg.value().decode('utf-8'))
# Debezium message contains 'before' and 'after' states.
# For both INSERT ('c' for create) and UPDATE ('u'), we only care about the 'after' state.
payload = value['payload']['after']
handle_weaviate_upsert(primary_key, payload)
# Manually commit the offset after successful processing of the message.
# This provides at-least-once delivery semantics.
consumer.commit(asynchronous=False)
except KeyboardInterrupt:
logging.info("Consumer shutting down.")
finally:
consumer.close()
if __name__ == "__main__":
consume_events()
The NumPy integration here is not trivial. It addresses a real-world data science problem: raw numerical features often perform poorly in machine learning models (and by extension, in distance-based vector searches if they were part of the vector). Applying transformations like np.log1p
to skewed data like price or using weighted combinations of features is a standard preprocessing step that NumPy excels at. This prepares a clean numerical representation that can be stored in Weaviate for powerful hybrid searches that filter on these transformed values.
The deterministic UUID generation (get_weaviate_uuid
) is the single most important detail for correctness. Without it, UPDATE
operations from SQL Server would create new, duplicate objects in Weaviate instead of overwriting the existing ones, leading to a corrupt index.
Phase 3: The Weaviate Schema and Sink
Before the consumer can write data, the schema (or “Class” in Weaviate terminology) must be defined. This definition tells Weaviate what properties to expect, their data types, and how to handle vectorization. In our case, we are providing vectors externally, so we configure the vectorizer as none
.
This setup is a one-time operation, typically run via a script before deploying the consumer.
# setup_weaviate_schema.py
import weaviate
import logging
import sys
WEAVIATE_URL = "http://localhost:8080"
CLASS_NAME = "Product"
logging.basicConfig(level=logging.INFO)
client = weaviate.Client(WEAVIATE_URL)
# Delete existing class for idempotency during development
if client.schema.exists(CLASS_NAME):
logging.warning(f"Class '{CLASS_NAME}' already exists. Deleting it.")
client.schema.delete_class(CLASS_NAME)
product_class_schema = {
"class": CLASS_NAME,
"description": "Represents a product from the SQL Server InventoryDB",
# We provide our own vectors, so we disable the built-in vectorizer.
"vectorizer": "none",
"properties": [
{
"name": "sql_id",
"dataType": ["int"],
"description": "The original primary key from the SQL Server table",
},
{
"name": "name",
"dataType": ["text"],
"description": "The name of the product",
},
{
"name": "price",
"dataType": ["number"],
"description": "The price of the product",
},
{
"name": "stock_count",
"dataType": ["int"],
"description": "Available stock quantity",
},
{
"name": "numerical_features",
"dataType": ["number[]"], # Array of numbers
"description": "Pre-processed numerical features from NumPy",
}
]
}
try:
client.schema.create_class(product_class_schema)
logging.info(f"Successfully created class '{CLASS_NAME}' in Weaviate.")
except Exception as e:
logging.critical(f"Failed to create Weaviate schema: {e}")
sys.exit(1)
With this schema in place and the consumer running, the system achieves its final state. A change committed to the Products
table in SQL Server is captured by Debezium, sent through Kafka, processed and vectorized by our Python service, and indexed in Weaviate, typically with an end-to-end latency of under a few seconds. The OLTP database is completely isolated from the expensive semantic search queries, which are now served efficiently by Weaviate.
This architecture is not without its complexities and limitations. The pipeline currently has no automated handling for DDL changes (ALTER TABLE
) on the source table. A schema change in SQL Server would require a manual update of the consumer logic and Weaviate schema, followed by a coordinated restart. Furthermore, while the initial snapshot is effective for bootstrapping, a full re-indexing of a multi-terabyte table without downtime presents a separate engineering challenge, likely requiring a blue-green indexing strategy where a new index is built in parallel and swapped in atomically. Finally, the system guarantees eventual consistency, which is acceptable for search but unsuitable for use cases demanding transactional read-your-writes consistency. The operational overhead of the embedding model API, both in terms of cost and latency, also becomes a critical factor at scale, pushing for the future adoption of a locally-hosted, fine-tuned model.