The operational burden of our self-hosted ActiveMQ cluster had become untenable. It powered a critical, Python-based Computer Vision (CV) pipeline responsible for processing user-uploaded images, but every scaling event was a manual, high-stakes procedure. The cluster was a black box for our observability stack, and its failover behavior was, to put it mildly, unpredictable. A full-stop migration was off the table; the business required zero-downtime and, more importantly, zero data loss for this revenue-critical service. The chosen path was a gradual migration using the Strangler Fig pattern, moving the workload to Google Cloud Pub/Sub. This necessitated the creation of a fault-tolerant, state-aware message bridge—the lynchpin of the entire operation.
The core of the problem wasn’t just forwarding messages. It was about creating a transitional state where two systems, one legacy and one cloud-native, could coexist, with the ability to shift the processing load gracefully and reversibly. The bridge had to guarantee that a message consumed from ActiveMQ would be durably persisted in Pub/Sub before it was acknowledged on the source queue. Anything less would introduce the risk of message loss during a bridge failure.
Our architecture for this migration evolved through several phases, best illustrated as a sequence of states.
graph TD subgraph Phase 1: Shadowing A1(Legacy Producer) --> AMQ(ActiveMQ Topic: cv-jobs) AMQ --> C1(Legacy CV Consumer) AMQ --> B(Bridge Service); B --> PS(GCP Pub/Sub Topic: cv-jobs-v2) PS --> C2(New CV Consumer - Dry Run) style B fill:#f9f,stroke:#333,stroke-width:2px end subgraph Phase 2: Primary Cutover A2(Legacy Producer) --> AMQ2(ActiveMQ Topic: cv-jobs) AMQ2 --> B2(Bridge Service) B2 -- Forwards & Acknowledges --> PS2(GCP Pub/Sub Topic: cv-jobs-v2) PS2 --> C3(New CV Consumer - Live) AMQ2 -.-> C4(Legacy CV Consumer - Standby); style B2 fill:#f9f,stroke:#333,stroke-width:2px style C4 fill:#ccc end subgraph Phase 3: Decommission A3(New Producer) --> PS3(GCP Pub/Sub Topic: cv-jobs-v2) PS3 --> C5(New CV Consumer - Live) subgraph To Be Removed AMQ3(ActiveMQ) B3(Bridge Service) C6(Legacy CV Consumer) end style AMQ3 fill:#ccc style B3 fill:#ccc style C6 fill:#ccc end Phase1 --> Phase2 --> Phase3;
The bridge itself was a Python application. The choice to stick with Python was pragmatic; our team’s expertise was there, and the performance characteristics of message bridging are overwhelmingly I/O-bound, making the GIL a non-issue. The critical components were the ActiveMQ listener, the Pub/Sub publisher, and the core orchestration logic that tied them together with robust error handling and configuration management.
Configuration and Application Entrypoint
In any real-world project, hardcoding connection details is a direct path to operational pain. We used Pydantic for settings management, allowing configuration via environment variables, which is standard practice for containerized deployments.
# src/config.py
import os
from typing import List, Optional
from pydantic_settings import BaseSettings
class ActiveMQSettings(BaseSettings):
"""Configuration for the ActiveMQ connection."""
hosts: List[str] = ['localhost:61613']
username: Optional[str] = None
password: Optional[str] = None
subscribe_destination: str = '/topic/cv-jobs'
# Use client-individual acknowledgment for fine-grained control
ack_mode: str = 'client-individual'
class Config:
env_prefix = 'ACTIVEMQ_'
class PubSubSettings(BaseSettings):
"""Configuration for the Google Cloud Pub/Sub connection."""
project_id: str
topic_id: str
# Publisher-side flow control to prevent overwhelming memory
max_bytes: int = 10 * 1024 * 1024 # 10 MB
max_messages: int = 1000
class Config:
env_prefix = 'PUBSUB_'
class BridgeSettings(BaseSettings):
"""Main application settings."""
log_level: str = 'INFO'
# A unique identifier for this bridge instance, useful for logging
instance_id: str = f"bridge-instance-{os.getpid()}"
activemq: ActiveMQSettings = ActiveMQSettings()
pubsub: PubSubSettings = PubSubSettings()
class Config:
# Allows nested settings to be loaded from env vars
# e.g., PUBSUB_PROJECT_ID=my-gcp-project
env_nested_delimiter = '__'
def get_settings() -> BridgeSettings:
"""Singleton function to load and cache settings."""
return BridgeSettings()
This setup provides a clean, validated configuration object that can be imported throughout the application. It fails fast if required environment variables (like PUBSUB_PROJECT_ID
) are missing.
The ActiveMQ Consumer Logic
Connecting to ActiveMQ from Python is typically done via the STOMP protocol. We chose the stomp.py
library for its maturity. A common pitfall is not handling connection drops and heart-beating correctly. A production-grade listener needs to be resilient and capable of reconnecting automatically.
We encapsulated the listener logic in its own class, designed to be run in a dedicated thread. This isolates the blocking nature of listening for messages.
# src/activemq_consumer.py
import time
import uuid
import logging
import stomp
from stomp.exception import StompException
from typing import Callable
from threading import Event
from .config import ActiveMQSettings
logger = logging.getLogger(__name__)
class ActiveMQListener(stomp.ConnectionListener):
"""
STOMP listener that handles message reception, errors, and connection state.
"""
def __init__(self, conn: stomp.Connection, message_handler: Callable, stop_event: Event):
self.conn = conn
self._message_handler = message_handler
self._stop_event = stop_event
def on_error(self, frame):
logger.error(f"Received an error frame: {frame.body}")
# Signal a stop to trigger reconnection logic
self._stop_event.set()
def on_message(self, frame):
# The core logic: pass the message to the handler.
# The handler is responsible for ack/nack.
try:
self._message_handler(frame.headers, frame.body)
except Exception:
logger.exception(
"Unhandled exception in message handler for message_id=%s. Message will be nacked.",
frame.headers.get('message-id')
)
# A common mistake is to not nack on failure. This would lead to the message
# being redelivered indefinitely or timing out.
self.conn.nack(frame.headers['message-id'], frame.headers['subscription'])
def on_disconnected(self):
logger.warning("Disconnected from ActiveMQ. Triggering stop event for reconnection.")
self._stop_event.set()
class ActiveMQConsumer:
"""
Manages the connection lifecycle and consumption from an ActiveMQ destination.
"""
def __init__(self, settings: ActiveMQSettings, message_handler: Callable):
self._settings = settings
self._message_handler = message_handler
self._conn: stomp.Connection = None
self._stop_event = Event()
def _connect(self) -> stomp.Connection:
"""Establishes and returns a STOMP connection."""
hosts = [(host.split(':')[0], int(host.split(':')[1])) for host in self._settings.hosts]
conn = stomp.Connection(host_and_ports=hosts, heartbeats=(4000, 4000))
conn.set_listener('', ActiveMQListener(conn, self._message_handler, self._stop_event))
logger.info(f"Connecting to ActiveMQ hosts: {hosts}")
conn.connect(
self._settings.username,
self._settings.password,
wait=True
)
return conn
def run(self):
"""
Main loop. Connects and stays connected, handling disconnects with a backoff strategy.
"""
while not self._stop_event.is_set():
try:
self._conn = self._connect()
subscription_id = f"bridge-sub-{uuid.uuid4()}"
self._conn.subscribe(
destination=self._settings.subscribe_destination,
id=subscription_id,
ack=self._settings.ack_mode
)
logger.info(
f"Successfully subscribed to {self._settings.subscribe_destination} with id {subscription_id}"
)
# Wait until a disconnect or error happens
while self._conn.is_connected() and not self._stop_event.is_set():
time.sleep(1)
except StompException:
logger.exception("STOMP connection failed during setup. Retrying...")
except Exception:
logger.exception("An unexpected error occurred in the consumer run loop. Retrying...")
finally:
if self._conn and self._conn.is_connected():
self._conn.disconnect()
if not self._stop_event.is_set():
logger.info("Reconnecting in 5 seconds...")
time.sleep(5)
# Reset the event if it was set by the listener to allow reconnection
if self._stop_event.is_set():
logger.warning("Stop event was triggered. Assuming disconnect and attempting to reconnect.")
self._stop_event.clear()
def get_connection(self) -> stomp.Connection:
if not self._conn or not self._conn.is_connected():
raise ConnectionError("ActiveMQ connection is not available.")
return self._conn
def stop(self):
"""Gracefully stops the consumer."""
logger.info("Stop signal received. Shutting down ActiveMQ consumer.")
self._stop_event.set()
if self._conn and self._conn.is_connected():
self._conn.disconnect()
The key design choice here is ack_mode='client-individual'
. This gives our application full control over when a message is considered “processed”. In the bridge’s context, “processed” means “safely published to Pub/Sub”.
The Google Cloud Pub/Sub Publisher Logic
The publisher side is simpler thanks to the well-designed Google Cloud client libraries, which handle batching, retries, and authentication transparently. However, it’s critical to understand how to handle publishing failures and structure the code for asynchronous results.
# src/pubsub_publisher.py
import json
import logging
from concurrent.futures import Future
from typing import Dict, Any, Optional
from google.cloud import pubsub_v1
from google.api_core.exceptions import GoogleAPICallError
from .config import PubSubSettings
logger = logging.getLogger(__name__)
class PubSubPublisher:
"""A wrapper around the GCP Pub/Sub publisher client."""
def __init__(self, settings: PubSubSettings):
self._settings = settings
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=settings.max_bytes,
max_messages=settings.max_messages,
)
self._publisher = pubsub_v1.PublisherClient(batch_settings)
self._topic_path = self._publisher.topic_path(settings.project_id, settings.topic_id)
logger.info(f"Pub/Sub publisher initialized for topic: {self._topic_path}")
def publish(self, body: bytes, attributes: Dict[str, str]) -> Future:
"""
Publishes a message to the configured topic.
Returns a concurrent.futures.Future that can be used to track the result.
"""
try:
# The client library handles encoding, but we ensure attributes are strings.
processed_attributes = {k: str(v) for k, v in attributes.items() if v is not None}
future = self._publisher.publish(
self._topic_path,
data=body,
**processed_attributes
)
return future
except GoogleAPICallError:
logger.exception("Failed to publish message to Pub/Sub due to API error.")
raise
def shutdown(self):
"""Shuts down the publisher, ensuring all pending messages are sent."""
logger.info("Shutting down Pub/Sub publisher.")
self._publisher.stop()
The publish
method returns a Future
. This is crucial. We don’t want to block our ActiveMQ consumer thread waiting for Pub/Sub to confirm receipt. Instead, we’ll use the future’s callback mechanism to trigger the ActiveMQ acknowledgement after the publish succeeds.
Orchestrating the Bridge
Now we tie everything together. The main application class initializes the consumer and publisher and defines the _message_handler
callback that orchestrates the flow from ActiveMQ to Pub/Sub.
# src/bridge.py
import logging
import threading
import signal
import time
from concurrent.futures import Future
from typing import Dict, Any
from .config import BridgeSettings
from .activemq_consumer import ActiveMQConsumer
from .pubsub_publisher import PubSubPublisher
logger = logging.getLogger(__name__)
class MessageBridge:
def __init__(self, settings: BridgeSettings):
self._settings = settings
self._publisher = PubSubPublisher(settings.pubsub)
# The message handler needs access to the consumer's connection to ack/nack
self._consumer = ActiveMQConsumer(settings.activemq, self._message_handler)
self._stop_event = threading.Event()
def _message_handler(self, headers: Dict[str, Any], body: str):
"""
This is the heart of the bridge. It receives a message from ActiveMQ,
forwards it to Pub/Sub, and only acks the original message upon
successful publishing.
"""
message_id = headers.get('message-id')
logger.debug(f"Received message {message_id} from ActiveMQ.")
# A real-world project must solve for idempotency. We use the ActiveMQ
# message-id as a unique key. If it's not present, we must generate one.
# This ID must be stored by the final consumer to prevent duplicate processing.
correlation_id = headers.get('correlation-id') or message_id or f"gen-{uuid.uuid4()}"
attributes_to_forward = {
'original_message_id': message_id,
'original_destination': headers.get('destination'),
'original_timestamp': headers.get('timestamp'),
'correlation_id': correlation_id
}
# The body from stomp.py can be bytes or str, ensure it's bytes for Pub/Sub
body_bytes = body.encode('utf-8') if isinstance(body, str) else body
try:
future = self._publisher.publish(body=body_bytes, attributes=attributes_to_forward)
# Add a callback to the future. This will be executed in a
# different thread by the Pub/Sub client library.
future.add_done_callback(
lambda f: self._on_publish_complete(f, headers)
)
except Exception:
logger.exception(f"Failed to schedule publish for message {message_id}. Nacking.")
self._nack_activemq_message(headers)
def _on_publish_complete(self, future: Future, amq_headers: Dict[str, Any]):
"""
Callback executed when the Pub/Sub publish future is resolved.
"""
message_id = amq_headers.get('message-id')
try:
# future.result() will raise an exception if the publish failed.
result = future.result()
logger.info(f"Successfully published message {message_id} to Pub/Sub with result: {result}")
self._ack_activemq_message(amq_headers)
except Exception:
logger.exception(f"Publishing to Pub/Sub failed for message {message_id}. Nacking.")
self._nack_activemq_message(amq_headers)
def _ack_activemq_message(self, headers: Dict[str, Any]):
try:
conn = self._consumer.get_connection()
conn.ack(headers['message-id'], headers['subscription'])
logger.debug(f"ACKed ActiveMQ message {headers['message-id']}")
except (ConnectionError, KeyError):
logger.exception("Failed to ACK ActiveMQ message. It may be redelivered.")
# This is a critical failure state. The message is in Pub/Sub but not
# ACKed in ActiveMQ. This is why the final consumer *must* be idempotent.
def _nack_activemq_message(self, headers: Dict[str, Any]):
try:
conn = self._consumer.get_connection()
conn.nack(headers['message-id'], headers['subscription'])
logger.warning(f"NACKed ActiveMQ message {headers['message-id']}")
except (ConnectionError, KeyError):
logger.exception("Failed to NACK ActiveMQ message.")
def run(self):
"""Starts the bridge and waits for a shutdown signal."""
logger.info(f"Starting message bridge instance: {self._settings.instance_id}")
consumer_thread = threading.Thread(target=self._consumer.run, daemon=True)
consumer_thread.start()
# Handle signals for graceful shutdown
signal.signal(signal.SIGINT, self._handle_shutdown)
signal.signal(signal.SIGTERM, self._handle_shutdown)
logger.info("Bridge is running. Press Ctrl+C to exit.")
self._stop_event.wait()
# Graceful shutdown
logger.info("Shutdown initiated.")
self._consumer.stop()
consumer_thread.join(timeout=10)
self._publisher.shutdown()
logger.info("Bridge shutdown complete.")
def _handle_shutdown(self, signum, frame):
if not self._stop_event.is_set():
self._stop_event.set()
The flow is clear: _message_handler
is triggered by the consumer thread. It calls _publisher.publish
, which immediately returns a future. It attaches _on_publish_complete
as a callback. The consumer thread is now free to process the next message from ActiveMQ. When the Pub/Sub client confirms the publish (or it fails), a worker thread from its internal pool executes _on_publish_complete
, which then reaches back to the consumer’s connection to perform the final ACK or NACK.
Testing Strategy
In a real-world project, untestable code is a liability. We must be able to test the core bridging logic without connecting to live services.
# tests/test_bridge.py
import unittest
from unittest.mock import Mock, MagicMock, patch
from concurrent.futures import Future
# Assume other necessary imports like MessageBridge, BridgeSettings, etc.
class TestMessageBridge(unittest.TestCase):
@patch('src.bridge.PubSubPublisher')
@patch('src.bridge.ActiveMQConsumer')
def setUp(self, MockActiveMQConsumer, MockPubSubPublisher):
self.settings = BridgeSettings() # Using default test settings
self.mock_consumer_instance = MockActiveMQConsumer.return_value
self.mock_publisher_instance = MockPubSubPublisher.return_value
# Mock the connection object on the consumer instance
self.mock_amq_conn = MagicMock()
self.mock_consumer_instance.get_connection.return_value = self.mock_amq_conn
self.bridge = MessageBridge(self.settings)
def test_message_handler_success_flow(self):
"""
Verify that a successful publish results in an ACK.
"""
# 1. Setup
mock_headers = {'message-id': '123', 'subscription': 'sub1'}
mock_body = '{"image_url": "..."}'
# Mock the publisher to return a successful future
success_future = Future()
success_future.set_result("pubsub-message-id-456")
self.mock_publisher_instance.publish.return_value = success_future
# 2. Execute
self.bridge._message_handler(mock_headers, mock_body)
# 3. Verify
# Check that publish was called correctly
self.mock_publisher_instance.publish.assert_called_once()
call_args = self.mock_publisher_instance.publish.call_args[1]
self.assertEqual(call_args['body'], mock_body.encode('utf-8'))
self.assertEqual(call_args['attributes']['original_message_id'], '123')
# Check that ACK was called on the connection
self.mock_amq_conn.ack.assert_called_once_with('123', 'sub1')
self.mock_amq_conn.nack.assert_not_called()
def test_message_handler_publish_failure(self):
"""
Verify that a failed publish results in a NACK.
"""
# 1. Setup
mock_headers = {'message-id': '789', 'subscription': 'sub2'}
mock_body = '{}'
# Mock the publisher to return a failed future
failed_future = Future()
failed_future.set_exception(RuntimeError("Pub/Sub unavailable"))
self.mock_publisher_instance.publish.return_value = failed_future
# 2. Execute
self.bridge._message_handler(mock_headers, mock_body)
# 3. Verify
self.mock_publisher_instance.publish.assert_called_once()
self.mock_amq_conn.nack.assert_called_once_with('789', 'sub2')
self.mock_amq_conn.ack.assert_not_called()
These unit tests validate the core decision logic: success leads to ACK, failure leads to NACK. This provides confidence that the bridge behaves as expected under different conditions before it ever touches a real message queue.
This bridge became the backbone of our migration for three months. It allowed us to move with confidence, knowing that our “at-least-once” delivery guarantee was upheld by the careful orchestration of acknowledgements. The primary remaining risk was the bridge’s own availability. While a single instance was remarkably stable, a production deployment ran three pods in a Kubernetes ReplicaSet. Since they all used the same subscription on the topic, ActiveMQ acted as a load balancer, providing high availability. The idempotency key (correlation_id
) handled the rare case of a message being processed by one pod, which then crashed before ACKing, causing the message to be redelivered to another pod. The new cloud-native service simply saw the duplicate ID and discarded the job.
The bridge is, by design, a temporary component. Its ultimate success is its own decommissioning. However, the architectural patterns—asynchronous acknowledgement, idempotent message design, and configuration-driven clients—are permanent assets. The lingering challenge is that this pattern, while effective, introduces latency, as a message must make an extra network hop. For our CV pipeline, where processing took seconds, this was negligible. For a low-latency trading system, this architecture would be entirely unsuitable, highlighting the boundaries of its applicability.