Building a Type-Safe Data Hydration Layer in Haskell for SSR Using SQL Server CDC and a NoSQL Cache


The project started with a familiar, frustrating constraint: a mission-critical, monolithic SQL Server database that we could not meaningfully alter. This database underpinned a legacy inventory management system, complete with years of accumulated stored procedures and complex relational logic. The new requirement was to build a public-facing product catalog portal. For SEO and performance reasons, Server-Side Rendering (SSR) was a non-negotiable part of the stack. A direct-to-database approach for the SSR server was immediately ruled out. The query patterns for rendering a rich product page were complex, involving multiple joins, and the latency was unacceptable, ranging from 150ms to over 800ms under moderate load. This would have crippled the SSR server’s throughput and destroyed the user experience.

Our initial concept was to introduce a caching layer. This is standard practice, but the real challenge was cache invalidation and consistency. The inventory data—stock levels, pricing, promotions—changed frequently and unpredictably. Stale data on the public site was not an option. A simple time-to-live (TTL) cache was too coarse and would either serve stale data or generate cache miss stampedes on popular items. We needed a way to proactively and granularly update a read-optimized replica of the data required for rendering, in near-real-time.

This led us to investigate SQL Server’s built-in Change Data Capture (CDC) feature. It felt like the right tool for the job—a low-impact, transactionally consistent way to get a stream of all INSERT, UPDATE, and DELETE operations from our core tables. The plan solidified: we would build a standalone service that consumes this CDC stream, transforms the raw change data into a denormalized view suitable for rendering, and pushes that view into a high-speed NoSQL store. The SSR server would then query this NoSQL store exclusively, ensuring millisecond response times.

The technology selection for the CDC processing service was critical. The service needed to be robust, handle concurrent data processing pipelines, and, most importantly, be type-safe. We were transforming data from a relational schema into a JSON document format. Any mismatch in data types or missing fields during this transformation could lead to runtime errors that would poison the cache and bring down the public site. This is where Haskell entered the picture. Its strong static typing, powerful concurrency libraries (async), and excellent data manipulation capabilities (Aeson for JSON) made it a perfect candidate for this data plumbing and transformation role. For the NoSQL cache, we chose Redis. Its speed is legendary, and its Hash data structure is a natural fit for storing our denormalized product objects.

The final architecture looked like this:

graph TD
    subgraph "Legacy System"
        SQL_SERVER[SQL Server Database] --1. Writes--> PRODUCT_TABLE[dbo.Products]
        PRODUCT_TABLE --2. CDC Process--> CDC_TABLE[cdc.dbo_Products_CT]
    end

    subgraph "Haskell CDC Service"
        CDC_POLLER[CDC Poller Worker] --3. Queries for changes--> CDC_TABLE
        CDC_POLLER --4. Processes changes--> TRANSFORM[Data Transformation Logic]
        TRANSFORM --5. Fetches related data--> SQL_SERVER
        TRANSFORM --6. Writes materialized view--> REDIS[Redis Cache]
    end

    subgraph "Presentation Layer"
        SSR_SERVER[SSR Application Server] --7. Requests data--> API[Haskell Hydration API]
    end

    subgraph "Haskell API Service"
        API --8. Reads from cache--> REDIS
    end

    SSR_SERVER --9. Renders HTML--> USER[User Browser]

Step 1: Enabling and Understanding SQL Server CDC

Before writing a single line of Haskell, the foundation had to be laid in the database. A common mistake is to underestimate the DBA work required. CDC is not enabled by default. It requires enabling it first at the database level, and then on each table we need to track.

-- First, ensure the SQL Server Agent is running.
-- Then, enable CDC for the entire database.
USE InventoryDB;
GO
EXEC sys.sp_cdc_enable_db;
GO

-- Next, enable CDC on the specific tables of interest.
-- We'll track Products and their related Prices.
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'Products',
    @role_name     = NULL, -- Using NULL allows public access to the change data
    @supports_net_changes = 1;
GO

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name   = N'ProductPrices',
    @role_name     = NULL,
    @supports_net_changes = 1;
GO

Once enabled, SQL Server creates several system tables. The most important one for our service is the change table, named <capture_instance>_CT. For our dbo.Products table, this would be cdc.dbo_Products_CT. Querying this table reveals the change log:

-- Example query to see recent changes to the Products table
SELECT
    __$start_lsn,
    __$operation, -- 1=delete, 2=insert, 3=update (before), 4=update (after)
    ProductID,
    ProductName,
    StockQuantity
FROM cdc.fn_cdc_get_all_changes_dbo_Products(
    sys.fn_cdc_get_min_lsn('dbo_Products'), -- from LSN
    sys.fn_cdc_get_max_lsn(),             -- to LSN
    N'all'
);

The key to a reliable CDC poller is managing the Log Sequence Numbers (LSNs). Our Haskell service needs to remember the last LSN it processed and ask for all changes since that LSN in the next poll. This prevents missing data or processing it twice.

Step 2: Structuring the Haskell Application

We used stack to manage our project. The package.yaml outlines our core dependencies.

# package.yaml
name:                cdc-processor
version:             0.1.0.0
dependencies:
- base >= 4.7 && < 5
- odbc # For connecting to SQL Server
- hedis # The Redis client
- aeson # For JSON encoding/decoding
- text
- bytestring
- scientific
- time
- vector
- async # For running concurrent workers
- servant-server # For the API layer
- wai
- warp
- containers

Our application’s core logic is split into several modules: Database, Cache, Types, and Worker. The Types module is crucial; it provides the type-safe bridge between the database schema, our application’s domain logic, and the JSON structure in Redis.

-- src/Types.hs
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Types where

import Data.Aeson
import Data.Int (Int32, Int64)
import Data.Text (Text)
import GHC.Generics (Generic)

-- Represents a raw change row from a CDC table
data ProductChange = ProductChange
  { pcLsn       :: !ByteString -- LSN is a binary type
  , pcOperation :: !Int
  , pcProductId :: !Int32
  , pcName      :: !(Maybe Text)
  , pcQuantity  :: !(Maybe Int32)
  } deriving (Show)

-- This is the denormalized, materialized view we'll store in Redis.
-- It's the "contract" for our SSR server.
data ProductView = ProductView
  { pvProductId   :: !Int32
  , pvProductName :: !Text
  , pvStock       :: !Int32
  , pvPrice       :: !Scientific
  , pvLastUpdated :: !UTCTime
  } deriving (Show, Generic)

-- Aeson instances for converting our view to/from JSON
instance ToJSON ProductView where
    toJSON = genericToJSON defaultOptions { fieldLabelModifier = camelTo2 '_' . drop 2 }

instance FromJSON ProductView where
    parseJSON = genericParseJSON defaultOptions { fieldLabelModifier = camelTo2 '_' . drop 2 }

The ProductChange type directly models the raw data we get from the CDC function. The ProductView type represents the final, clean document our SSR server will consume. Using Haskell’s type system here ensures at compile time that we can’t accidentally try to build a ProductView with missing data.

Step 3: Implementing the CDC Polling Worker

The worker is the heart of the system. It runs in a continuous loop, managed by the async library. Its state is simple: the last LSN it successfully processed. In a production system, this LSN should be persisted to a file or a dedicated table to ensure the service can resume where it left off after a crash.

-- src/Worker.hs
module Worker where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, wait)
import Control.Monad (forever)
import qualified Data.ByteString as B
import Database.ODBC.SQLServer (Connection)
import System.IO (hPutStrLn, stderr)

import qualified Database as DB
import qualified Cache as C

-- The main loop for the CDC worker process.
runWorker :: Connection -> C.RedisConnection -> IO ()
runWorker sqlConn redisConn = do
    putStrLn "Starting CDC worker..."
    -- Initialize the LSN. In a real app, this would be read from a persistent store.
    let initialLsn = B.empty 
    loop sqlConn redisConn initialLsn
  where
    loop conn rconn lastLsn = forever $ do
        -- 1. Get the current maximum LSN from the database.
        currentMaxLsn <- DB.getCurrentMaxLsn conn
        
        -- A pitfall here is polling an idle database. If no changes occurred,
        -- the currentMaxLsn will be the same as lastLsn. We don't need to query.
        if currentMaxLsn > lastLsn
            then do
                putStrLn $ "Polling for changes from LSN: " ++ show lastLsn ++ " to " ++ show currentMaxLsn
                -- 2. Fetch all changes between the last LSN and the current max.
                changes <- DB.fetchProductChanges conn lastLsn currentMaxLsn
                
                -- 3. Process each change. This is the critical part.
                -- Using `mapConcurrently` from `async` would be an option for heavy loads.
                processResults <- mapM (processChange conn rconn) changes

                -- 4. Find the new latest LSN we've processed successfully.
                -- This logic must be robust. We only advance the LSN if processing was successful.
                let newLastLsn = findMaxLsn changes lastLsn
                
                putStrLn $ "Processed " ++ show (length changes) ++ " changes. New LSN is " ++ show newLastLsn
                loop conn rconn newLastLsn
            else do
                -- No new changes, wait before polling again.
                threadDelay (5 * 1000000) -- 5 seconds
                loop conn rconn lastLsn

The processChange function is where the business logic resides. An UPDATE operation in CDC creates two rows: one for the ‘before’ state (operation = 3) and one for the ‘after’ state (operation = 4). We are only interested in the final state, so we typically filter for operations 2 (insert) and 4 (update after).

-- Part of Worker.hs

import qualified Types as T

-- Process a single change event from the CDC table.
processChange :: DB.Connection -> C.RedisConnection -> T.ProductChange -> IO ()
processChange sqlConn redisConn change = do
    -- We only care about the final state of a row.
    when (T.pcOperation change `elem` [2, 4]) $ do
        putStrLn $ "Processing change for ProductID: " ++ show (T.pcProductId change)
        
        -- The CDC log only contains columns from the changed table.
        -- To build our full ProductView, we need to query the original tables
        -- to get related data (like price, which is in another table).
        -- This is a key architectural decision: enrich the data here.
        maybeFullProduct <- DB.fetchFullProductDetails sqlConn (T.pcProductId change)

        case maybeFullProduct of
            Just productView -> do
                -- We successfully built the view, now write it to Redis.
                C.setProductView redisConn productView
                putStrLn $ "Successfully updated cache for ProductID: " ++ show (T.pcProductId change)
            Nothing ->
                -- This can happen if the product was deleted between our CDC read and this query.
                -- It's an edge case we must handle. We might want to delete from cache.
                hPutStrLn stderr $ "Could not fetch full details for ProductID: " ++ show (T.pcProductId change)

Step 4: Database and Cache Interaction Logic

The Database and Cache modules contain the messy details of I/O. We use the odbc library for SQL Server. Its API is low-level but effective.

-- src/Database.hs
{-# LANGUAGE OverloadedStrings #-}

module Database where

import Database.ODBC.SQLServer
import Data.Text (Text)
import Data.Time (getCurrentTime)
import qualified Types as T

-- Fetches changes within a given LSN range.
fetchProductChanges :: Connection -> LSN -> LSN -> IO [T.ProductChange]
fetchProductChanges conn fromLsn toLsn =
    query conn "SELECT __$start_lsn, __$operation, ProductID, ProductName, StockQuantity \
              \ FROM cdc.fn_cdc_get_all_changes_dbo_Products(?, ?, N'all')"
              (fromLsn, toLsn)

-- Fetches all necessary data to build a complete ProductView.
-- This might involve joins in a real-world scenario.
fetchFullProductDetails :: Connection -> Int32 -> IO (Maybe T.ProductView)
fetchFullProductDetails conn pid = do
    -- In a real project, this would be a more complex query joining
    -- Products, ProductPrices, Promotions, etc.
    rows <- query conn "SELECT p.ProductName, p.StockQuantity, pr.Price \
                      \ FROM dbo.Products p JOIN dbo.ProductPrices pr ON p.ProductID = pr.ProductID \
                      \ WHERE p.ProductID = ?" (Only pid)

    -- The result of a query is a list, so we handle the case where no product is found.
    case rows of
        [(name, stock, price)] -> do
            now <- getCurrentTime
            return $ Just T.ProductView
                { pvProductId   = pid
                , pvProductName = name
                , pvStock       = stock
                , pvPrice       = price
                , pvLastUpdated = now
                }
        _ -> return Nothing

The Redis interaction is handled by hedis. We serialize our ProductView record to a JSON bytestring and store it. The key schema is important for discoverability.

-- src/Cache.hs
{-# LANGUAGE OverloadedStrings #-}

module Cache where

import Database.Redis
import Data.Aeson (encode)
import qualified Data.ByteString.Char8 as B
import qualified Types as T

type RedisConnection = Connection

-- Write a ProductView to the Redis cache.
setProductView :: RedisConnection -> T.ProductView -> IO ()
setProductView conn view = do
    let key = "product:view:" <> B.pack (show $ T.pvProductId view)
    let val = encode view -- Aeson encodes directly to a lazy ByteString
    
    -- Using `runRedis` to execute the command.
    -- The `Set` command will overwrite any existing value.
    -- We could also set an expiry if needed, but our logic relies on proactive updates.
    result <- runRedis conn $ set key (toStrict val)
    case result of
        Right Ok -> return ()
        Left err -> hPutStrLn stderr $ "Failed to write to Redis: " ++ show err

A key detail is error handling. What if Redis is temporarily unavailable? The hedis library will return a Left value. In a production system, this should trigger a retry mechanism with exponential backoff, and if it continues to fail, an alert should be raised. For now, we just log to stderr.

Step 5: The Hydration API for the SSR Server

Finally, we need a simple, fast API for the SSR server to consume. servant is a powerful choice for its type-level API definition, but for this simple use case, scotty or a barebones wai application is sufficient and lighter.

-- src/Api.hs
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Api where

import Web.Scotty
import Network.Wai.Handler.Warp (run)
import Data.Aeson (encode)
import Text.Read (readMaybe)
import qualified Cache as C

-- The API provides a single endpoint to fetch a materialized product view.
runApiServer :: C.RedisConnection -> Int -> IO ()
runApiServer redisConn port = do
    putStrLn $ "Starting API server on port " ++ show port
    scotty port $ do
        get "/products/:id" $ do
            (idStr :: String) <- param "id"
            case readMaybe idStr of
                Just (pid :: Int) -> do
                    -- Fetch the raw ByteString from Redis.
                    maybeVal <- liftIO $ C.getProductView redisConn pid
                    case maybeVal of
                        Just val -> do
                            setHeader "Content-Type" "application/json"
                            raw val -- Return the raw JSON bytestring
                        Nothing -> status 404
                Nothing -> status 400

-- This function would live in Cache.hs
getProductView :: RedisConnection -> Int -> IO (Maybe B.ByteString)
getProductView conn pid = do
    let key = "product:view:" <> B.pack (show pid)
    runRedis conn $ get key

This API endpoint is brutally simple and fast. It does nothing but a key lookup in Redis. All the complex logic of joining, transforming, and calculating has already been performed asynchronously by our Haskell CDC worker. This architectural decoupling is the primary benefit, allowing the front-facing API to remain responsive and stateless, even if the backend SQL Server is under heavy transactional load.

Lingering Issues and Future Iterations

This implementation provides a robust baseline, but it’s not without its limitations in a large-scale production environment. The CDC polling mechanism, while efficient, introduces a small delay equal to the polling interval. For true real-time needs, one might explore SQL Server’s Service Broker or a message queue integration.

The current processChange function is also susceptible to race conditions. If a product is deleted immediately after its data is changed, our fetchFullProductDetails query might return nothing, leaving stale data in the cache. A more robust solution would be to handle the DELETE operation (__$operation = 1) explicitly and issue a DEL command to Redis for that product key.

Furthermore, the single-node Haskell service is a single point of failure. The LSN persistence is critical for recovery, but for high availability, one would need to run multiple instances of the service. This introduces the complexity of leader election to ensure only one instance is actively polling and writing to the cache, preventing duplicate processing and write contention. The transformation logic itself could also become a bottleneck; for very high-volume changes, offloading the enrichment and transformation steps to a distributed stream processing framework like Apache Flink might be a necessary evolution.


  TOC