Implementing a Declarative Data Synchronization Fabric Between HBase and DynamoDB via Crossplane


The technical directive was deceptively simple: establish a presence for our core data platform in a new AWS region. The reality was a legacy, high-throughput analytics system, battle-hardened and built around a self-managed HBase cluster. It processed hundreds of thousands of transactions per second, and its operational patterns were deeply ingrained in our tooling. The target state was DynamoDB, chosen for its serverless nature and lower operational burden in the cloud. A hard cutover was a non-starter; the system’s uptime SLA is measured in seconds per year, not hours.

Our initial plan was a straightforward, one-way data migration script. We’d stream data from HBase to DynamoDB, validate, and then flip the switch. This concept fell apart during the first architecture review. The business required a phased, client-by-client rollout, meaning for a period of weeks, or even months, both systems had to be live and consistent. Some clients would write to HBase, others to DynamoDB. This immediately transformed the problem from a simple migration into a bi-directional synchronization challenge.

The conventional path would involve a message bus like Kafka and CDC connectors. We considered Debezium for HBase, but our platform engineering team had been standardizing on a Kubernetes-native, declarative control plane for all infrastructure. Adding a separate, complex streaming platform felt like a step backward, introducing another stateful system to manage. The real goal was to allow application teams to provision a “synchronized data table” with a single kubectl apply command, abstracting away the underlying complexity of HBase and DynamoDB. This led us to Crossplane. The idea was radical: could we define a custom API on Kubernetes that provisions and manages both a legacy-style HBase table and a cloud-native DynamoDB table as a single atomic unit? The synchronization logic itself would be a stateless C# microservice, deployed alongside, that acted as the data plane. This architecture aligned perfectly with our GitOps philosophy.

Defining the Unified Resource with Crossplane

The first step was to teach our Kubernetes cluster how to speak “HBase” and “DynamoDB” through a unified abstraction. This is Crossplane’s core strength. We started by defining a CompositeResourceDefinition (XRD) that described our desired resource, which we named HybridTable. This XRD is the schema for our new custom Kubernetes API.

# xrds/hybridtable.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: CompositeResourceDefinition
metadata:
  name: hybridtables.dataplatform.internal
spec:
  group: dataplatform.internal
  names:
    kind: HybridTable
    plural: hybridtables
  claimNames:
    kind: HybridTableClaim
    plural: hybridtableclaims
  versions:
  - name: v1alpha1
    served: true
    referenceable: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              tableName:
                type: string
                description: "The base name for the table, used for both HBase and DynamoDB."
              hbaseColumnFamilies:
                type: array
                items:
                  type: string
                description: "A list of column families for the HBase table."
              dynamoDbReadCapacity:
                type: integer
                description: "Provisioned read capacity units for the DynamoDB table."
                default: 5
              dynamoDbWriteCapacity:
                type: integer
                description: "Provisioned write capacity units for the DynamoDB table."
                default: 5
            required:
              - tableName
              - hbaseColumnFamilies

This XRD establishes a new resource HybridTable.dataplatform.internal. It takes simple parameters like a base table name and HBase column families. Next, we needed a Composition to translate this abstract definition into concrete DynamoDB.Table resources managed by provider-aws and HBase.Table resources. A pitfall here is that there’s no official Crossplane provider for on-premise HBase. In a real-world project, we’d wrap the HBase REST API or an HBase Kubernetes Operator with a custom Crossplane provider. For this implementation, we will simulate this by using a Kubernetes ConfigMap to represent the HBase table definition, which a separate controller would act upon. This demonstrates the principle without the overhead of building a full provider.

# compositions/hybridtable-aws.yaml
apiVersion: apiextensions.crossplane.io/v1
kind: Composition
metadata:
  name: hybridtable-aws-composition
  labels:
    provider: aws
spec:
  compositeTypeRef:
    apiVersion: dataplatform.internal/v1alpha1
    kind: HybridTable
  resources:
    - name: dynamodbtable
      base:
        apiVersion: dynamodb.aws.upbound.io/v1beta1
        kind: Table
        spec:
          forProvider:
            region: us-east-1
            attribute:
              - name: "ID"
                type: "S"
            hashKey: "ID"
            billingMode: "PROVISIONED"
            tags:
              ManagedBy: "Crossplane"
              Source: "HybridTableComposition"
      patches:
        - fromFieldPath: "spec.tableName"
          toFieldPath: "metadata.name"
        - fromFieldPath: "spec.tableName"
          toFieldPath: "spec.forProvider.name"
        - fromFieldPath: "spec.dynamoDbReadCapacity"
          toFieldPath: "spec.forProvider.readCapacity"
        - fromFieldPath: "spec.dynamoDbWriteCapacity"
          toFieldPath: "spec.forProvider.writeCapacity"

    - name: hbasetabledefinition
      base:
        apiVersion: v1
        kind: ConfigMap
        metadata:
          namespace: hbase-system
          labels:
            hbase-table-request: "true"
      patches:
        - fromFieldPath: "spec.tableName"
          toFieldPath: "metadata.name"
          transforms:
            - type: string
              string:
                fmt: "hbase-table-%s"
        - fromFieldPath: "spec.tableName"
          toFieldPath: "data.tableName"
        - fromFieldPath: "spec.hbaseColumnFamilies"
          toFieldPath: "data.columnFamilies"
          transforms:
            - type: "convert"
              convert:
                toType: "string" # Convert array to a comma-separated string

With these definitions applied to the cluster, any developer can now provision a complete, synchronized table set with a simple manifest:

# claims/user-profiles-table.yaml
apiVersion: dataplatform.internal/v1alpha1
kind: HybridTableClaim
metadata:
  name: user-profiles
  namespace: app-team-alpha
spec:
  compositionRef:
    name: hybridtable-aws-composition
  tableName: UserProfiles
  hbaseColumnFamilies:
    - "personal"
    - "metrics"
  dynamoDbReadCapacity: 10
  dynamoDbWriteCapacity: 10

Applying this manifest triggers Crossplane’s reconciliation loop. It provisions a DynamoDB table named UserProfiles in AWS and creates a ConfigMap in the hbase-system namespace, signaling our (simulated) HBase operator to create the corresponding table. The infrastructure is now declarative and unified.

The C# Synchronization Core

The heart of the data plane is a C# worker service. It’s a BackgroundService designed to run continuously within a Kubernetes pod. Its sole job is to poll both databases for changes and replicate them to the other side. A common mistake is to build this as a monolithic loop. A more maintainable approach is to separate the concerns of data fetching, transformation, and writing for each direction.

We use the official AWS SDK for .NET and a community-maintained client for HBase. The project setup is minimal:

<!-- SyncService.csproj -->
<Project Sdk="Microsoft.NET.Sdk.Worker">
  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
    <DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="AWSSDK.DynamoDBv2" Version="3.7.303.1" />
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
    <!-- For HBase, one might use a client like HBase.NET or a custom Thrift client -->
    <!-- We will mock the client interface for clarity -->
    <PackageReference Include="Polly" Version="8.2.0" />
    <PackageReference Include="Serilog.AspNetCore" Version="8.0.0" />
    <PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
  </ItemGroup>
</Project>

The SyncWorker class forms the backbone. We inject clients and configuration, and the ExecuteAsync method orchestrates the two synchronization flows.

// Services/SyncWorker.cs
public class SyncWorker : BackgroundService
{
    private readonly ILogger<SyncWorker> _logger;
    private readonly IDynamoDbSyncService _dynamoService;
    private readonly IHBaseSyncService _hbaseService;
    private readonly TimeSpan _syncInterval = TimeSpan.FromSeconds(5);

    public SyncWorker(ILogger<SyncWorker> logger, IDynamoDbSyncService dynamoService, IHBaseSyncService hbaseService)
    {
        _logger = logger;
        _dynamoService = dynamoService;
        _hbaseService = hbaseService;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Synchronization worker starting.");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // In a real system, these would run in parallel tasks.
                // Running sequentially for simplicity of demonstration.
                _logger.LogInformation("Starting HBase -> DynamoDB sync cycle.");
                await _hbaseService.SyncHBaseToDynamoDbAsync(stoppingToken);

                _logger.LogInformation("Starting DynamoDB -> HBase sync cycle.");
                await _dynamoService.SyncDynamoDbToHBaseAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                // Global catch-all to prevent the worker from crashing.
                _logger.LogError(ex, "An unhandled exception occurred during sync cycle.");
            }

            await Task.Delay(_syncInterval, stoppingToken);
        }

        _logger.LogInformation("Synchronization worker stopping.");
    }
}

Implementing the HBase to DynamoDB Flow

The logic for syncing from HBase is predicated on being able to find new or updated rows. A naive full table scan is not feasible. In a production scenario, one might leverage HBase’s replication features. A pragmatic alternative, if schemas can be modified, is to use a timestamp-based approach. We can structure our HBase row keys or a specific column to include a timestamp and scan ranges based on the last successful sync time.

Let’s focus on the data transformation. HBase deals in byte arrays, while DynamoDB has a typed schema. This is a critical point of failure if not handled robustly.

// Services/HBaseSyncService.cs

// Mock IHBaseClient for demonstration
public interface IHBaseClient { Task<IEnumerable<HBaseRow>> ScanForUpdatesAsync(long lastSyncTimestamp); }
public record HBaseRow(string RowKey, Dictionary<string, byte[]> Columns);

public class HBaseSyncService : IHBaseSyncService
{
    private readonly ILogger<HBaseSyncService> _logger;
    private readonly IAmazonDynamoDB _dynamoDbClient;
    private readonly IHBaseClient _hBaseClient;
    private long _lastSyncTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - 3600_000; // Start one hour ago

    // A resilience policy is crucial for transient network or service errors.
    private readonly IAsyncPolicy _dynamoDbWritePolicy;

    public HBaseSyncService(ILogger<HBaseSyncService> logger, IAmazonDynamoDB dynamoDbClient, IHBaseClient hBaseClient)
    {
        _logger = logger;
        _dynamoDbClient = dynamoDbClient;
        _hBaseClient = hBaseClient;

        _dynamoDbWritePolicy = Policy
            .Handle<ProvisionedThroughputExceededException>()
            .Or<RequestLimitExceededException>()
            .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogWarning(exception, "Retry {RetryCount} for DynamoDB write after {TimeSpan}", retryCount, timeSpan);
                });
    }

    public async Task SyncHBaseToDynamoDbAsync(CancellationToken cancellationToken)
    {
        var newTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        var updatedRows = await _hBaseClient.ScanForUpdatesAsync(_lastSyncTimestamp);

        var writeRequests = new List<WriteRequest>();
        foreach (var row in updatedRows)
        {
            var dynamoItem = MapHBaseToDynamo(row);
            if (dynamoItem != null)
            {
                writeRequests.Add(new WriteRequest { PutRequest = new PutRequest { Item = dynamoItem } });
            }
        }

        if (writeRequests.Any())
        {
            await BatchWriteToDynamoAsync("UserProfiles", writeRequests, cancellationToken);
        }
        
        _lastSyncTimestamp = newTimestamp;
        _logger.LogInformation("Completed HBase -> DynamoDB sync. Processed {Count} rows.", writeRequests.Count);
    }

    private async Task BatchWriteToDynamoAsync(string tableName, List<WriteRequest> writeRequests, CancellationToken ct)
    {
        // DynamoDB BatchWriteItem has a limit of 25 items per request.
        foreach (var chunk in writeRequests.Chunk(25))
        {
            var request = new BatchWriteItemRequest
            {
                RequestItems = new Dictionary<string, List<WriteRequest>>
                {
                    { tableName, chunk.ToList() }
                }
            };

            await _dynamoDbWritePolicy.ExecuteAsync(async () =>
            {
                var response = await _dynamoDbClient.BatchWriteItemAsync(request, ct);
                // In production, proper handling of UnprocessedItems is mandatory.
                if (response.UnprocessedItems.Any())
                {
                     _logger.LogWarning("DynamoDB reported unprocessed items, which requires a retry strategy.");
                }
            });
        }
    }

    private Dictionary<string, AttributeValue>? MapHBaseToDynamo(HBaseRow hbaseRow)
    {
        try
        {
            // The pitfall here is assuming data is clean. Always validate and handle exceptions.
            var item = new Dictionary<string, AttributeValue>
            {
                { "ID", new AttributeValue { S = hbaseRow.RowKey } }
            };

            // This mapping logic is highly business-specific.
            // Example: "personal:email" -> email (S)
            if (hbaseRow.Columns.TryGetValue("personal:email", out var emailBytes))
            {
                item["email"] = new AttributeValue { S = System.Text.Encoding.UTF8.GetString(emailBytes) };
            }
            
            // Example: "metrics:login_count" -> loginCount (N)
            if (hbaseRow.Columns.TryGetValue("metrics:login_count", out var countBytes))
            {
                // Robust parsing is key
                if (int.TryParse(System.Text.Encoding.UTF8.GetString(countBytes), out var loginCount))
                {
                    item["loginCount"] = new AttributeValue { N = loginCount.ToString() };
                }
            }
            
            // Add sync timestamp for conflict resolution
            item["_sync_timestamp"] = new AttributeValue { N = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString() };

            return item;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to map HBase row with key {RowKey} to DynamoDB item.", hbaseRow.RowKey);
            return null;
        }
    }
}

The true complexity arises with bi-directional sync. What happens if a user’s profile is updated in HBase and DynamoDB simultaneously, before a sync cycle can complete? This race condition will lead to data divergence. The simplest strategy is “last write wins” (LWW). We implement this by adding a dedicated _sync_timestamp attribute to both data stores. Before writing an update, the service must read the destination record and compare timestamps.

This fundamentally changes the write logic from a blind PutItem to a conditional UpdateItem or a read-then-write pattern.

sequenceDiagram
    participant SyncSvc as Synchronization Service
    participant HBase
    participant DynamoDB

    SyncSvc->>HBase: Scan for updates since last sync
    HBase-->>SyncSvc: Return Row A (timestamp T1)
    
    SyncSvc->>DynamoDB: GetItem for Row A
    DynamoDB-->>SyncSvc: Return Item A (existing _sync_timestamp T0)

    alt T1 > T0
        SyncSvc->>DynamoDB: PutItem for Row A with new timestamp T1
        DynamoDB-->>SyncSvc: OK
    else
        SyncSvc->>SyncSvc: Discard update (stale)
        Note right of SyncSvc: Log discarded update
    end

The C# implementation of this logic requires modifying our mapper and write service. The BatchWriteItem API is no longer sufficient as it doesn’t support conditional writes. We must switch to individual PutItem calls with ConditionExpression. This is a performance trade-off: we gain consistency at the cost of throughput.

// Inside a modified write service for DynamoDB...
public async Task<bool> ConditionalWriteToDynamoAsync(string tableName, Dictionary<string, AttributeValue> item)
{
    // Extract the timestamp from the incoming item.
    if (!long.TryParse(item["_sync_timestamp"].N, out var sourceTimestamp))
    {
        _logger.LogWarning("Item {ItemId} missing valid _sync_timestamp. Skipping.", item["ID"].S);
        return false;
    }

    var request = new PutItemRequest
    {
        TableName = tableName,
        Item = item,
        // Condition: write ONLY IF the attribute does not exist (new item)
        // OR the existing timestamp is less than ours.
        ConditionExpression = "attribute_not_exists(ID) OR _sync_timestamp < :new_timestamp",
        ExpressionAttributeValues = new Dictionary<string, AttributeValue>
        {
            { ":new_timestamp", new AttributeValue { N = sourceTimestamp.ToString() } }
        }
    };

    try
    {
        await _dynamoDbWritePolicy.ExecuteAsync(async () => await _dynamoDbClient.PutItemAsync(request));
        return true;
    }
    catch (ConditionalCheckFailedException)
    {
        _logger.LogInformation("Conditional write failed for item {ItemId}. Destination has newer data.", item["ID"].S);
        return false;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Error during conditional write for item {ItemId}.", item["ID"].S);
        throw; // Re-throw for worker to handle
    }
}

This logic must be mirrored for the DynamoDB-to-HBase sync path, using HBase’s checkAndPut atomic operation. This conflict resolution strategy is fundamental to the stability of the entire system.

Deployment and Observability

The final piece is packaging the C# service into a container and deploying it to Kubernetes. We need a Dockerfile and a Deployment manifest.

# Dockerfile
FROM mcr.microsoft.com/dotnet/runtime:8.0-alpine AS base
WORKDIR /app
EXPOSE 8080

FROM mcr.microsoft.com/dotnet/sdk:8.0-alpine AS build
WORKDIR /src
COPY ["SyncService.csproj", "."]
RUN dotnet restore "./SyncService.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "SyncService.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "SyncService.csproj" -c Release -o /app/publish /p:UseAppHost=false

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "SyncService.dll"]

The Kubernetes Deployment should include readiness and liveness probes and be configured with an IAM Role for Service Accounts (IRSA) to securely grant DynamoDB access to the pod. Proper logging and metrics are non-negotiable. Using Serilog with a JSON console sink allows for easy ingestion into a logging platform like Loki or Elasticsearch. For metrics, prometheus-net can expose an endpoint for Prometheus to scrape, tracking counters for synced records, errors, and histograms for sync latency.

The result is a highly decoupled, resilient system. The infrastructure layer is managed declaratively through Git. A developer wanting a new synchronized table only needs to commit a single YAML file. The C# data plane service, running in Kubernetes, is stateless and horizontally scalable. It handles the messy reality of data transformation, transient errors, and write conflicts.

This architecture is not without its limitations. The polling-based change detection mechanism introduces a baseline latency equal to the polling interval. For workloads requiring near-real-time replication, this would need to be replaced with an event-driven approach, tapping into DynamoDB Streams and an HBase replication source. The “last write wins” strategy, while simple, is a lossy form of conflict resolution; if two different but valid updates are made concurrently, one will be silently discarded. For financial or transactional data, a more sophisticated CRDT approach or a manual conflict resolution queue would be required. Finally, the cost of DynamoDB I/O and cross-region data transfer for a high-volume sync must be carefully modeled and monitored to avoid budget overruns.


  TOC