The initial request seemed simple: when a high-value customer record is created or updated in our legacy Java monolith, a machine learning model retraining pipeline in Kubeflow must be triggered. The monolith, a sprawling Spring Boot application, relies heavily on JPA and Hibernate for its data persistence layer against a PostgreSQL database. The ML workloads are managed entirely within a Google Kubernetes Engine cluster via Kubeflow Pipelines.
Our first attempt was a naive, direct integration. The service layer method responsible for saving a Customer
entity was modified to include a REST client call to a Kubeflow pipeline endpoint after the entityManager.persist()
call.
// DO NOT DO THIS. This is the problematic initial approach.
@Transactional
public Customer createHighValueCustomer(CustomerData data) {
Customer customer = new Customer(data);
entityManager.persist(customer);
// The point of failure.
// What if the DB commit succeeds but this network call fails?
// Or what if this succeeds but the transaction rolls back?
kubeflowApiClient.triggerRetrainingPipeline(customer.getId());
return customer;
}
This design failed catastrophically during load testing. We were immediately confronted with the classic dual-write problem. If the database transaction committed successfully but the network call to the Kubeflow API failed, the ML pipeline was never triggered. The state of our systems diverged, leading to stale models. Even worse, if the network call succeeded but the surrounding database transaction later rolled back due to an unrelated constraint violation, we would trigger a pipeline for data that never actually made it into the system of record. This approach created an unacceptably tight coupling and guaranteed data inconsistency.
The clear path forward was decoupling through asynchronous messaging. Google Cloud Pub/Sub was the obvious choice, as our Kubeflow environment was already running on GCP. The revised concept was to publish a message to a Pub/Sub topic within the same service method. But this only shifted the dual-write problem from a REST API to a message broker. A failure to publish after a successful commit still resulted in a lost event.
This dead end forced a fundamental rethink. We needed a mechanism that could guarantee an event would be published if, and only if, the originating database transaction succeeded. The source of truth is the database’s transaction log (the Write-Ahead Log or WAL in PostgreSQL). This led us to Change Data Capture (CDC) and specifically, the Debezium project.
However, using CDC to tail the customers
table directly presented its own issues. It exposed raw database changes, which might be too low-level or “chatty” for downstream consumers. Business logic is often spread across multiple table updates within a single transaction, and we wanted to emit a single, cohesive business event. This is where we landed on our final architecture: the Transactional Outbox pattern, implemented with JPA/Hibernate and captured by a Debezium connector configured to watch our outbox table. Podman became our tool of choice for managing the complex local development environment required to validate this entire flow end-to-end.
Establishing the Transactional Outbox in JPA
The core principle of the Transactional Outbox pattern is to persist the event to be published in a dedicated table within the same database and same local transaction as the business state change. This atomic write operation ensures the event is captured if and only if the business data is successfully committed.
First, we defined the database schema for our outbox table.
-- DDL for the outbox table in PostgreSQL
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
Next, we created a corresponding JPA entity. Using a JsonBinaryType
from a library like hibernate-types
is essential for mapping the JSONB
payload efficiently without resorting to string manipulation.
// pom.xml dependency for hibernate-types
// <dependency>
// <groupId>com.vladmihalcea</groupId>
// <artifactId>hibernate-types-55</artifactId>
// <version>2.17.1</version>
// </dependency>
import com.vladmihalcea.hibernate.type.json.JsonBinaryType;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import javax.persistence.*;
import java.time.Instant;
import java.util.UUID;
@Entity
@Table(name = "outbox_events")
@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
public class OutboxEvent {
@Id
private UUID id;
@Column(name = "aggregate_type", nullable = false)
private String aggregateType;
@Column(name = "aggregate_id", nullable = false)
private String aggregateId;
@Column(name = "event_type", nullable = false)
private String eventType;
@Type(type = "jsonb")
@Column(columnDefinition = "jsonb", nullable = false)
private String payload; // JSON payload as a String
@Column(name = "created_at", nullable = false)
private Instant createdAt;
// Private constructor for JPA
private OutboxEvent() {}
public static OutboxEvent create(String aggregateType, String aggregateId, String eventType, String payload) {
OutboxEvent event = new OutboxEvent();
event.id = UUID.randomUUID();
event.aggregateType = aggregateType;
event.aggregateId = aggregateId;
event.eventType = eventType;
event.payload = payload;
event.createdAt = Instant.now();
return event;
}
// Getters...
}
With the entity in place, the crucial step was integrating its persistence into our business logic transactionally. We created a generic EventPublisher
service that decouples the event creation from the business service.
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@Component
public class OutboxEventPublisher {
@PersistenceContext
private EntityManager entityManager;
private final ObjectMapper objectMapper = new ObjectMapper(); // Production: inject this
// This method is called from the business service within a transaction.
public void fire(BusinessEvent event) {
try {
String payload = objectMapper.writeValueAsString(event.getPayload());
OutboxEvent outboxEvent = OutboxEvent.create(
event.getAggregateType(),
event.getAggregateId(),
event.getEventType(),
payload
);
// This is persisted in the same transaction as the business entity.
entityManager.persist(outboxEvent);
} catch (JsonProcessingException e) {
// In a real-world project, this should be a more specific, unchecked exception.
throw new RuntimeException("Failed to serialize event payload", e);
}
}
}
// A simple DTO for business events
interface BusinessEvent {
String getAggregateType();
String getAggregateId();
String getEventType();
Object getPayload();
}
Our CustomerService
now uses this publisher, ensuring atomicity.
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class CustomerService {
@PersistenceContext
private EntityManager entityManager;
private final OutboxEventPublisher eventPublisher;
public CustomerService(OutboxEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@Transactional
public Customer createHighValueCustomer(CustomerData data) {
Customer customer = new Customer(data);
entityManager.persist(customer);
// Define the business event payload
CustomerCreatedPayload payload = new CustomerCreatedPayload(
customer.getId(),
customer.getName(),
customer.getTier(),
customer.getSignupDate()
);
// Fire the event. This will create and persist an OutboxEvent entity
// within this same database transaction.
eventPublisher.fire(new CustomerCreatedEvent(customer.getId().toString(), payload));
return customer;
}
}
Now, a single COMMIT
to the database atomically saves the new customer
row and the corresponding outbox_events
row. If anything fails, the entire transaction is rolled back, and no event is left behind. We have achieved transactional consistency. The next challenge was to get this event out of the database and into Pub/Sub.
Local Development Stack with Podman
Before deploying anything, we needed a robust local environment to test this CDC pipeline. This involved running PostgreSQL, Zookeeper, Kafka, and Debezium Connect. Managing these with individual docker run
commands is tedious. Podman, with its concept of “pods” (groups of containers sharing a network namespace), provided a much cleaner setup.
Here is the shell script we used to bootstrap the entire local stack. It creates a single pod and then runs each container within it, ensuring they can communicate via localhost
.
#!/bin/bash
set -e
POD_NAME="ml-pipeline-pod"
POSTGRES_DB="monolith_db"
POSTGRES_USER="user"
POSTGRES_PASSWORD="password"
# Clean up previous runs
podman pod exists ${POD_NAME} && podman pod rm -f ${POD_NAME}
# 1. Create a pod to house all our services
echo "Creating pod '${POD_NAME}'..."
podman pod create --name ${POD_NAME} -p 5432:5432 -p 9092:9092 -p 8083:8083
# 2. Start PostgreSQL with WAL level set to 'logical' for Debezium
echo "Starting PostgreSQL..."
podman run -d --pod ${POD_NAME} --name postgres \
-e POSTGRES_DB=${POSTGRES_DB} \
-e POSTGRES_USER=${POSTGRES_USER} \
-e POSTGRES_PASSWORD=${POSTGRES_PASSWORD} \
-v pg_data:/var/lib/postgresql/data \
docker.io/debezium/postgres:14 \
-c 'wal_level=logical'
# 3. Start Zookeeper
echo "Starting Zookeeper..."
podman run -d --pod ${POD_NAME} --name zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-e ZOOKEEPER_TICK_TIME=2000 \
docker.io/confluentinc/cp-zookeeper:7.2.1
# 4. Start Kafka
echo "Starting Kafka..."
podman run -d --pod ${POD_NAME} --name kafka \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
docker.io/confluentinc/cp-kafka:7.2.1
# Wait for Kafka to be ready
echo "Waiting for Kafka to start..."
sleep 15
# 5. Start Debezium Connect
echo "Starting Debezium Connect..."
podman run -d --pod ${POD_NAME} --name connect \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e BOOTSTRAP_SERVERS=localhost:9092 \
docker.io/debezium/connect:1.9
echo "Local environment is up and running."
echo "Connect REST API available at http://localhost:8083"
This setup provides a fully functional, isolated environment. We can connect our local Spring Boot application to localhost:5432
and start testing the outbox persistence. The Debezium Connect service exposes a REST API on localhost:8083
for configuring the connectors.
Configuring Debezium and the Pub/Sub Sink
The magic happens in the Debezium connector configuration. We need to tell it to monitor our outbox_events
table and transform the raw CDC event into a clean business event before sending it to Kafka.
Here is the JSON payload to POST
to http://localhost:8083/connectors
to create the PostgreSQL source connector:
{
"name": "outbox-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "monolith_db",
"database.server.name": "monolith-server",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "monolith.events.${routedByValue}",
"transforms.outbox.table.field.event.payload": "payload"
}
}
The critical parts are the transforms
. Debezium’s EventRouter
transform is designed specifically for the outbox pattern.
-
table.include.list
: We explicitly tell Debezium to only watchpublic.outbox_events
. -
transforms.outbox.type
: This specifies theEventRouter
Single Message Transform (SMT). -
route.by.field
: We use theaggregate_type
column (e.g., “Customer”) from our outbox table to determine the destination topic. -
route.topic.replacement
: This dynamically creates Kafka topic names likemonolith.events.Customer
. -
table.field.event.payload
: This is the most important setting. It tells the transform to extract the actual event content from thepayload
column of the outbox table and use it as the Kafka message’s value. This unwraps our business event, discarding the CDC envelope.
With this connector running, any new row in outbox_events
is almost instantly converted into a clean event on a Kafka topic. The final step is to bridge this to Google Cloud Pub/Sub. We use the official Kafka Connect Pub/Sub Sink Connector. First, we need to add the connector plugin to our Debezium Connect container (in a production setup, you’d build a custom image). For local testing, we can podman exec
into the container and install it.
Then, we register the sink connector configuration:
{
"name": "gcp-pubsub-sink-connector",
"config": {
"connector.class": "com.google.cloud.pubsub.kafka.sink.CloudPubSubSinkConnector",
"tasks.max": "1",
"topics": "monolith.events.Customer",
"cps.project": "your-gcp-project-id",
"cps.topic": "customer-events-for-ml",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"gcp.credentials.file.path": "/path/inside/connect/container/to/keyfile.json"
}
}
This configuration subscribes to the monolith.events.Customer
topic in our local Kafka and forwards every message to the customer-events-for-ml
topic in Google Cloud Pub/Sub.
The end-to-end data flow is now complete:
graph TD A[Monolith Service] -- 1. Persist --> B((PostgreSQL DB)); subgraph "Atomic Transaction" B1[customers table] B2[outbox_events table] end B -- 2. WAL --> C[Debezium Connector]; C -- 3. Unwraps Event --> D((Local Kafka)); D -- 4. Forwards --> E[Pub/Sub Sink Connector]; E -- 5. Publishes --> F((GCP Pub/Sub Topic));
Consuming Events in a Kubeflow Pipeline
The final piece is a Kubeflow component that can be triggered by these Pub/Sub messages. In a real-world MLOps platform, this would likely be a more sophisticated event-driven system (e.g., using Knative Eventing or Argo Events). For clarity, we’ll build a component that acts as a long-running subscriber, though in practice this might be a trigger that instantiates a new pipeline run.
Here is a Python-based Kubeflow component that listens to a Pub/Sub subscription.
from kfp.dsl import component, Output, Artifact
from google.cloud import pubsub_v1
import json
import logging
import time
# Configure logging for clarity in Kubeflow logs
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@component(
base_image="python:3.9-slim",
packages_to_install=["google-cloud-pubsub"]
)
def pubsub_event_trigger(
gcp_project_id: str,
subscription_id: str,
triggered_event_payload: Output[Artifact]
):
"""
Listens to a GCP Pub/Sub subscription and writes the first received
message payload to an output artifact. In a real pipeline, this would
trigger downstream tasks.
"""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(gcp_project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
logging.info(f"Received message: {message.data}")
try:
# The event payload that originated from our outbox table
payload_str = message.data.decode("utf-8")
event_data = json.loads(payload_str)
logging.info(f"Successfully deserialized event for customer ID: {event_data.get('customerId')}")
# Write the payload to the output artifact file for downstream components
with open(triggered_event_payload.path, 'w') as f:
f.write(payload_str)
# Acknowledge the message so it's not redelivered.
message.ack()
# In a simple trigger component, we might exit after one message.
# For a long-running listener, you would handle this differently.
# This will stop the subscriber from pulling more messages.
future.cancel()
except json.JSONDecodeError as e:
logging.error(f"Failed to decode message data as JSON: {message.data}", exc_info=True)
# Do not acknowledge the message, let it be redelivered or go to a dead-letter queue.
message.nack()
except Exception as e:
logging.error("An unexpected error occurred in the callback", exc_info=True)
message.nack()
# The subscriber is non-blocking. future.result() will block here until
# the future is cancelled in the callback.
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
logging.info(f"Listening for messages on {subscription_path}...")
try:
# Block until the future is cancelled, with a timeout.
streaming_pull_future.result(timeout=300)
except TimeoutError:
logging.warning("Timeout reached, no message received. Exiting.")
streaming_pull_future.cancel()
except Exception as e:
logging.error(f"Subscriber error: {e}")
streaming_pull_future.cancel()
logging.info("Subscriber shut down.")
# Example of using this component in a KFP pipeline
# from kfp import dsl
#
# @dsl.pipeline(name="event-driven-ml-pipeline")
# def customer_event_pipeline(
# project_id: str = "your-gcp-project-id",
# subscription: str = "your-subscription-id"
# ):
# trigger_task = pubsub_event_trigger(
# gcp_project_id=project_id,
# subscription_id=subscription
# )
#
# # A downstream component that uses the event payload
# # data_processing_task = process_customer_data(
# # event_payload=trigger_task.outputs["triggered_event_payload"]
# # )
This architecture provides a fully decoupled, reliable, and observable bridge between our legacy JPA monolith and our modern MLOps platform. By leveraging the database’s transaction log as the ultimate source of truth via the outbox pattern, we have eliminated the dual-write problem and can guarantee that every critical business event is eventually processed by our ML pipelines.
The current implementation, however, is not without its own set of production considerations. The outbox table itself will grow indefinitely if not pruned; a separate, idempotent cleanup job is required to remove events that have been successfully published and processed. Furthermore, the Debezium and Kafka Connect services, while run locally with Podman, must be deployed in a highly available configuration in production, likely on Kubernetes, to avoid creating a new single point of failure. Lastly, schema management for the event payloads in the outbox table becomes a critical concern, requiring a robust versioning strategy to prevent breaking downstream consumers like the Kubeflow pipeline as the monolith’s data contracts evolve.