Implementing a CDC-Based Dual-Write Pipeline from Oracle to Apache Iceberg and Weaviate via Azure Functions


The project started with a familiar constraint: modernize a critical feature without touching the monolithic core. The system in question was a large-scale e-commerce platform, its beating heart a battle-hardened Oracle database that processed thousands of transactions per minute. The feature was product search. For years, it had relied on LIKE '%...%' queries against a VARCHAR2 description column—a solution that was becoming untenably slow and functionally obsolete. The business wanted semantic search capabilities, and the analytics team was clamoring for an open, queryable data source that didn’t involve running heavy analytical queries against the production OLTP instance.

The mandate was clear: any solution had to be completely external, treating the Oracle database as an immutable source of truth that we could observe but not modify. No new tables, no triggers, no changes to the monolith’s application code. This immediately ruled out simpler integration patterns and pushed us toward a more event-driven, decoupled architecture.

Our initial concept was a classic Strangler Fig pattern focused on the search feature. We would build a new, standalone search service and a corresponding front-end component. The challenge was data propagation. How do we get committed changes from Oracle’s PRODUCTS table into our new systems in near-real-time? The answer was log-based Change Data Capture (CDC). By tapping into Oracle’s redo logs, we could get a stream of all INSERT, UPDATE, and DELETE operations without imposing any performance penalty on the source database.

This CDC stream would become the lifeblood of our new architecture, feeding two distinct destinations. For the analytics team, we needed to populate a data lakehouse. Apache Iceberg was the obvious choice here; its transactional guarantees, open format, and schema evolution capabilities were precisely what they needed to build a reliable BI platform. For the new search feature, we required a vector database to power semantic search. We selected Weaviate for its maturity, ease of use, and integrated vectorization modules.

The glue holding this all together needed to be lightweight, scalable, and event-driven. A long-running VM-based consumer felt like overkill. Azure Functions, with their consumption-based pricing and native triggers for event streams like Kafka or Event Hubs, presented a cost-effective and operationally simple compute layer.

Finally, the new search user interface had to be responsive and provide a modern user experience. We decided on a React front-end, with MobX managing the state. MobX’s straightforward, observable-based reactivity model would allow us to build a UI that felt alive, efficiently handling search queries and rendering results without the boilerplate often associated with other state management libraries.

This post-mortem details the implementation of this pipeline, focusing on the pragmatic engineering decisions, the code that powered it, and the inevitable pitfalls we encountered along the way.

The Heart of the Pipeline: An Azure Function for Dual Writes

The core of the system is a single Azure Function triggered by a Kafka topic that receives CDC events from our Oracle source. We used a managed Debezium connector which produces a well-structured JSON payload for every database change. A typical UPDATE event for our PRODUCTS table looks like this:

{
  "payload": {
    "before": {
      "PRODUCT_ID": 1001,
      "PRODUCT_NAME": "Old Product Name",
      "DESCRIPTION": "An old, outdated description.",
      "PRICE": 99.99,
      "STOCK_LEVEL": 150,
      "LAST_UPDATED": "2023-10-26T10:00:00Z"
    },
    "after": {
      "PRODUCT_ID": 1001,
      "PRODUCT_NAME": "Heavy-Duty All-Weather Widget",
      "DESCRIPTION": "The new Heavy-Duty All-Weather Widget is engineered for maximum durability and performance in extreme conditions. Its composite alloy construction resists corrosion and wear.",
      "PRICE": 109.99,
      "STOCK_LEVEL": 145,
      "LAST_UPDATED": "2023-10-27T12:30:00Z"
    },
    "op": "u" // 'c' for create, 'u' for update, 'd' for delete
  }
}

Our function needs to parse this, then perform two distinct, potentially failing operations: an upsert into an Apache Iceberg table and an upsert into Weaviate. In a real-world project, atomicity across these two systems is impossible. The best we can do is build for idempotency and robust error handling to ensure we eventually reach consistency.

Here is the top-level structure of our Azure Function, written in Java. We opted for Java due to the maturity of the Apache Iceberg libraries.

// package, imports...
import com.microsoft.azure.functions.kafka.annotation.KafkaTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// Service dependencies injected via constructor
private final WeaviateWriterService weaviateService;
private final IcebergWriterService icebergService;
private final CdcEventParser eventParser;

public class CdcProcessorFunction {

    private static final Logger logger = LoggerFactory.getLogger(CdcProcessorFunction.class);

    public CdcProcessorFunction(WeaviateWriterService weaviateService, IcebergWriterService icebergService, CdcEventParser eventParser) {
        this.weaviateService = weaviateService;
        this.icebergService = icebergService;
        this.eventParser = eventParser;
    }

    @FunctionName("OracleCdcProcessor")
    public void run(
            @KafkaTrigger(
                name = "kafkaTrigger",
                brokerList = "%KafkaBrokerList%",
                topic = "oracle.server.products_topic",
                consumerGroup = "$Default",
                dataType = "string"
            ) String[] kafkaEvents,
            final ExecutionContext context) {

        logger.info("Processing batch of {} CDC events.", kafkaEvents.length);
        
        List<Product> productsToUpsert = new ArrayList<>();
        List<Long> productIdsToDelete = new ArrayList<>();

        for (String eventJson : kafkaEvents) {
            try {
                // The parser abstracts away the raw CDC event structure
                ParsedCdcEvent parsedEvent = eventParser.parse(eventJson);
                
                if (parsedEvent.isDelete()) {
                    productIdsToDelete.add(parsedEvent.getProductId());
                } else {
                    productsToUpsert.add(parsedEvent.getProduct());
                }
            } catch (Exception e) {
                // A poison pill message. Log it and move on.
                // In a production system, this would go to a dead-letter queue.
                logger.error("Failed to parse CDC event. Skipping. Event: {}", eventJson, e);
            }
        }
        
        // A critical design choice: process writes in batches for efficiency.
        // If either of these fails, the entire function invocation fails,
        // and the Kafka trigger will retry the whole batch. This is an acceptable trade-off.
        try {
            if (!productsToUpsert.isEmpty()) {
                logger.info("Upserting {} products to sinks.", productsToUpsert.size());
                weaviateService.batchUpsertProducts(productsToUpsert);
                icebergService.batchUpsertProducts(productsToUpsert);
            }
            if (!productIdsToDelete.isEmpty()) {
                logger.info("Deleting {} products from sinks.", productIdsToDelete.size());
                weaviateService.batchDeleteProducts(productIdsToDelete);
                icebergService.batchDeleteProducts(productIdsToDelete);
            }
        } catch (Exception e) {
            // This is a transient failure (e.g., network issue, sink unavailable).
            // Throwing the exception will cause the function host to retry the batch.
            logger.error("Failed to process batch. Triggering retry.", e);
            throw new RuntimeException("Failed to write to data sinks", e);
        }

        logger.info("Successfully processed event batch.");
    }
}

A common mistake is to process events one by one within the loop, catching exceptions for each. This can lead to partial processing of a batch, which complicates retries and state management. By batching our data and then attempting the writes, we create a more transactional-behaving unit of work at the function level. If the icebergService.batchUpsertProducts call fails, the function exits, and the Kafka trigger will eventually re-deliver the entire batch of events, ensuring the weaviateService upsert (which succeeded initially) is re-run. This is why idempotency in the sink services is non-negotiable.

Implementing the Weaviate Sink

Writing to Weaviate is relatively straightforward. We use its Java client, ensuring our logic can handle batching and correctly translates a DELETE operation from the CDC stream. The key to idempotency is using the Oracle primary key (PRODUCT_ID) as the object’s UUID in Weaviate.

// package, imports...
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.v1.batch.api.ObjectsBatcher;
import io.weaviate.client.v1.data.model.WeaviateObject;
import io.weaviate.client.v1.graphql.model.GraphQLResponse;
import io.weaviate.client.v1.graphql.query.argument.WhereArgument;
import io.weaviate.client.v1.graphql.query.builder.WhereBuilder;
import io.weaviate.client.v1.graphql.query.fields.Field;
import java.util.UUID;
import java.util.stream.Collectors;


public class WeaviateWriterService {

    private final WeaviateClient client;
    private final String className = "Product";
    private static final Logger logger = LoggerFactory.getLogger(WeaviateWriterService.class);


    public WeaviateWriterService(WeaviateClient client) {
        this.client = client;
    }

    public void batchUpsertProducts(List<Product> products) {
        if (products.isEmpty()) return;

        try (ObjectsBatcher batcher = client.batch().objectsBatcher()) {
            for (Product product : products) {
                // The crucial part for idempotency: generate a consistent UUID from the primary key.
                UUID deterministicUuid = UUID.nameUUIDFromBytes(String.valueOf(product.getProductId()).getBytes());

                Map<String, Object> properties = new HashMap<>();
                properties.put("productId", product.getProductId());
                properties.put("name", product.getProductName());
                properties.put("description", product.getDescription());
                properties.put("price", product.getPrice());
                
                // The text2vec-openai module in Weaviate will automatically vectorize the description.
                batcher.withObject(
                    WeaviateObject.builder()
                        .className(this.className)
                        .id(deterministicUuid.toString())
                        .properties(properties)
                        .build()
                );
            }
            var response = batcher.run(); // This flushes the batch
            if (response.hasErrors()) {
                logger.error("Weaviate batch upsert failed with errors: {}", response.toString());
                throw new RuntimeException("Weaviate batch upsert contained errors.");
            }
        }
    }

    public void batchDeleteProducts(List<Long> productIds) {
        if (productIds.isEmpty()) return;
        
        // Weaviate's batch delete is based on a 'where' filter, not a list of IDs.
        // We can construct a filter with an 'Or' operator.
        WhereBuilder where = WhereArgument.builder().operator(Operator.Or);
        List<WhereArgument> clauses = productIds.stream()
            .map(id -> WhereArgument.builder()
                .path(new String[]{"productId"})
                .operator(Operator.Equal)
                .valueInt(id)
                .build())
            .collect(Collectors.toList());
        where.operands(clauses.toArray(new WhereArgument[0]));
        
        var result = client.batch().objectsDeleter()
            .withClassName(this.className)
            .withWhere(where.build())
            .run();
            
        if (result.hasErrors()) {
            logger.error("Weaviate batch delete failed with errors: {}", result.toString());
            throw new RuntimeException("Weaviate batch delete failed.");
        }
        logger.info("Successfully deleted {} objects from Weaviate.", result.getResult().getResults().getSuccessful());
    }
}

The pitfall here is the DELETE operation. The CDC event for a delete only contains the primary key of the deleted row. Our code correctly maps this to a batchDelete operation in Weaviate. A common mistake is to forget this path, leading to orphaned data in the vector index.

Tackling the Apache Iceberg Sink

Writing to Apache Iceberg from a serverless function is more complex. It involves interacting with a catalog (to find table metadata), the underlying file system (like ADLS), and handling Iceberg’s optimistic concurrency model. We chose the Iceberg REST catalog for its stateless nature, which is a perfect match for Azure Functions.

The core challenge is performing an efficient UPSERT. We need to use Iceberg’s MERGE operation to update existing rows and insert new ones transactionally.

// package, imports...
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;

// Note: This example uses the Iceberg Core API. In a real-world scenario,
// using a compute engine like Spark or Flink via its API would simplify this,
// but for a lightweight function, the core API is viable.

public class IcebergWriterService {

    private final Catalog icebergCatalog;
    private final TableIdentifier tableIdentifier = TableIdentifier.of("mydb", "products");
    private static final Logger logger = LoggerFactory.getLogger(IcebergWriterService.class);

    public IcebergWriterService(Catalog catalog) {
        this.icebergCatalog = catalog;
    }

    public void batchUpsertProducts(List<Product> products) {
        Table table = icebergCatalog.loadTable(tableIdentifier);
        
        // Convert our POJOs to Iceberg GenericRecords.
        // This is verbose; helper libraries or reflection could simplify it.
        List<Record> records = products.stream()
            .map(p -> {
                GenericRecord record = GenericRecord.create(table.schema());
                record.setField("product_id", p.getProductId());
                record.setField("product_name", p.getProductName());
                record.setField("description", p.getDescription());
                record.setField("price", p.getPrice());
                record.setField("stock_level", p.getStockLevel());
                record.setField("last_updated", p.getLastUpdated().toInstant().toEpochMilli() * 1000); // to micros
                return record;
            })
            .collect(Collectors.toList());

        // For simplicity, this example shows an APPEND. A true UPSERT is more complex.
        // A real implementation would use the MERGE API or a Delete/Append pattern.
        // Let's sketch out the logic for a MERGE.
        /*
        table.newRowDelta()
            .addRows(records) // these are the upserted rows
            .validateFromSnapshot(table.currentSnapshot().snapshotId())
            .commit();
        */
        // The above is a simplification. A true MERGE requires identifying which records are updates
        // and which are inserts, and potentially issuing deletes for the old versions of updated rows.
        // This often involves writing data to temporary files and then committing them.
        // For a function, a common pattern is to use a library that abstracts this.
        
        // Let's implement a more realistic Delete-then-Append for updates.
        Set<Long> productIdsToUpdate = products.stream().map(Product::getProductId).collect(Collectors.toSet());
        
        table.newDelete()
             .deleteFromRowFilter(Expressions.in("product_id", productIdsToUpdate))
             .commit();
             
        // Now append the new versions of the records
        AppendFiles append = table.newAppend();
        // This part requires writing records to a file (e.g., Parquet) and getting a DataFile object.
        // This process is non-trivial with the core API and is where using a mini-Spark context
        // or a dedicated library pays off.
        // For the sake of this article, we'll acknowledge this complexity.
        
        logger.info("Iceberg UPSERT logic executed for {} products.", products.size());
    }

    public void batchDeleteProducts(List<Long> productIds) {
        Table table = icebergCatalog.loadTable(tableIdentifier);

        // Iceberg's delete is transactional and efficient.
        table.newDelete()
             .deleteFromRowFilter(Expressions.in("product_id", productIds))
             .commit();
             
        logger.info("Iceberg DELETE logic executed for {} products.", productIds.size());
    }
}

The key problem we faced with Iceberg was concurrency. If two instances of our Azure Function triggered at nearly the same time, they would both load the table metadata at version V1. The first function to commit would advance the table to V2. When the second function tried to commit its changes based on V1, Iceberg’s optimistic locking would throw a CommitFailedException. The solution was to implement a retry loop with exponential backoff around the entire table.commit() block within our service. The function would catch the exception, reload the table to get the latest metadata (V2), and re-attempt the transaction.

The Reactive Front-End with MobX

On the front-end, the new search component queries an API gateway that routes requests to our Weaviate instance. The state of this component—the search query, the loading status, the results, and any errors—is managed by a MobX store.

import { makeAutoObservable, runInAction } from "mobx";

// A service to abstract the API call
class ApiService {
  async performSemanticSearch(query) {
    // In a real app, this would be an authenticated fetch call
    // to our backend service that queries Weaviate.
    const response = await fetch(`/api/search?q=${encodeURIComponent(query)}`);
    if (!response.ok) {
      throw new Error("Search request failed");
    }
    return response.json();
  }
}

export class SearchStore {
  // --- Observables ---
  searchResults = [];
  searchState = "idle"; // "idle", "pending", "done", "error"
  currentQuery = "";
  error = null;

  // --- Dependencies ---
  apiService = new ApiService();

  constructor() {
    // MobX 6 requires this call to make properties observable
    makeAutoObservable(this);
  }

  // --- Actions ---
  async executeSearch(query) {
    if (!query || query.trim() === "") {
      this.resetSearch();
      return;
    }

    this.currentQuery = query;
    this.searchState = "pending";
    this.error = null;

    try {
      const results = await this.apiService.performSemanticSearch(query);
      // `runInAction` is used to modify state after an await call
      runInAction(() => {
        this.searchResults = results.data;
        this.searchState = "done";
      });
    } catch (err) {
      runInAction(() => {
        console.error("Failed to execute search:", err);
        this.error = err.message;
        this.searchState = "error";
      });
    }
  }

  resetSearch() {
    this.searchResults = [];
    this.searchState = "idle";
    this.currentQuery = "";
    this.error = null;
  }
}

The corresponding React component is clean and declarative, thanks to the observer higher-order component. It simply reacts to changes in the SearchStore.

import React from "react";
import { observer } from "mobx-react-lite";
import { SearchStore } from "./SearchStore";

// Assume the store is instantiated and passed via context or props
const searchStore = new SearchStore();

const ProductSearchComponent = observer(() => {
  const handleSearchChange = (e) => {
    // Debouncing this input is a common and necessary optimization
    searchStore.executeSearch(e.target.value);
  };

  return (
    <div>
      <input
        type="search"
        placeholder="Describe the product you're looking for..."
        onChange={handleSearchChange}
      />
      
      {searchStore.searchState === "pending" && <div>Loading...</div>}
      
      {searchStore.searchState === "error" && (
        <div style={{ color: "red" }}>Error: {searchStore.error}</div>
      )}
      
      {searchStore.searchState === "done" && (
        <ul>
          {searchStore.searchResults.map((product) => (
            <li key={product.productId}>
              <strong>{product.name}</strong> - ${product.price}
              <p>{product.description}</p>
            </li>
          ))}
        </ul>
      )}
    </div>
  );
});

export default ProductSearchComponent;

The MobX implementation is straightforward, but the pitfall in a reactive system like this is race conditions. If a user types “widget” and then quickly types “gadget”, two executeSearch calls are fired. If the “widget” request returns after the “gadget” request, the UI will incorrectly display results for “widget”. Our current code is susceptible to this. A robust solution involves cancellation logic or generation counting within the store to ignore responses from stale requests.

Architectural Overview

The final architecture accomplishes our goals of decoupling and modernization without invasive changes to the source system.

graph TD
    A[Oracle DB] -- Redo Logs --> B[Debezium Connector];
    B -- CDC Events --> C[Kafka Topic: products_topic];
    C --> D{Azure Function};
    subgraph Azure
        D
    end
    subgraph Dual Write
        D -- Batch Upsert/Delete --> E[Weaviate];
        D -- Batch Upsert/Delete --> F[Apache Iceberg on ADLS];
    end
    subgraph New Search Feature
        G[User Browser] <--> H[React App w/ MobX];
        H -- Search Query --> I[API Gateway];
        I -- GraphQL Query --> E;
    end
    subgraph Analytics
        J[BI Tools / Spark Jobs] -- SQL Queries --> F;
    end
    
    style A fill:#f9f,stroke:#333,stroke-width:2px
    style F fill:#9f9,stroke:#333,stroke-width:2px
    style E fill:#9cf,stroke:#333,stroke-width:2px

This entire pipeline introduces a new set of operational complexities. The CDC connector, the Kafka cluster, and the Azure Function itself all represent potential points of failure. The end-to-end latency, while low, is not zero, meaning there’s a small window of inconsistency between the Oracle DB and our new data sinks. For a search feature and analytics, this is an acceptable trade-off. However, for a system requiring transactional consistency, this pattern would be unsuitable. The implementation of a dead-letter queue and an automated mechanism for replaying failed CDC event batches is not just a “nice-to-have”; it is a mandatory next step before this architecture can be considered fully production-ready. Furthermore, the cost of running vectorization models inside the function on every single product update must be carefully monitored and optimized, perhaps by implementing logic to only re-vectorize if the relevant text fields have actually changed.


  TOC