The immediate problem wasn’t subtle: cascading failures. A fleet of our Python Flask services relied on a third-party payment gateway. Under normal load, everything worked. But during peak traffic, this gateway would intermittently slow down or error out. The result was predictable. Our Flask worker threads would block on HTTP requests, tying up Gunicorn workers. Soon, the entire service would become unresponsive, not just for payment-related requests, but for everything. A single downstream dependency was capable of taking down our entire application.
An initial attempt involved a simple in-memory circuit breaker implemented as a Flask decorator. It worked in testing, but in production, it was almost useless. With a dozen horizontally scaled Flask instances, each instance had its own isolated view of the downstream service’s health. One instance might trip its local breaker, but the other eleven would continue to hammer the failing gateway, perpetuating the problem. We needed a shared, coordinated understanding of the gateway’s state across all instances. This pointed directly to the need for a distributed circuit breaker, and for that, we needed a reliable distributed coordination service.
Our choice landed on Apache Zookeeper. While some might consider it an older technology, its guarantees around consistency, session management, and its “watch” mechanism are perfectly suited for managing the state of a distributed pattern like this. It’s a battle-tested tool, and in a real-world project, predictability often trumps novelty.
The architecture we settled on is straightforward:
- Flask Services: Each instance of our service would run a circuit breaker decorator.
- Zookeeper: Acts as the centralized, strongly-consistent state store for all breakers. The state (CLOSED, OPEN, HALF_OPEN), failure counts, and timers would live here.
- Tailwind CSS Dashboard: A simple, internal Flask-served web page for real-time visualization of the breaker states, allowing operations teams to see the health of our external dependencies at a glance. Tailwind was chosen for its ability to let us build a clean, functional UI without writing a single line of custom CSS.
graph TD subgraph Flask Service Fleet F1[Flask Instance 1] F2[Flask Instance 2] F3[Flask Instance N] end subgraph Zookeeper Cluster ZK[ZNode State
/breakers/payment_gateway] end subgraph Downstream Service DS[Flaky Payment Gateway] end subgraph Monitoring User[Operator] --> Dashboard[Tailwind CSS Dashboard] Dashboard -- polls API --> F_API[Flask API Endpoint] F_API -- reads state --> ZK end F1 -- read/write --> ZK F2 -- read/write --> ZK F3 -- read/write --> ZK F1 -- calls --> DS F2 -- calls --> DS F3 -- calls --> DS
Foundation: A Resilient Zookeeper Client
Before even thinking about the breaker logic, the first step is establishing a robust connection to Zookeeper. A common mistake is to treat the client connection as infallible. Network partitions happen. Zookeeper sessions can expire. The application must handle these events gracefully. We use the kazoo
library, and a production-grade implementation requires managing the client’s lifecycle and state listeners.
Here is the core client management module. It’s not just a simple KazooClient
instantiation; it’s wrapped in a class that handles startup, shutdown, and logging state changes.
app/zookeeper_client.py
:
import logging
import atexit
from kazoo.client import KazooClient, KazooState
# Configure a dedicated logger for ZK interactions.
# In a real app, this would be part of a larger logging config.
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ZookeeperManager:
"""
Manages the lifecycle of the KazooClient.
Ensures a single, state-aware client instance for the application.
"""
_client = None
@classmethod
def get_client(cls):
if cls._client is None or not cls._client.connected:
raise RuntimeError("Zookeeper client not initialized or connected. Call init_client() first.")
return cls._client
@classmethod
def init_client(cls, hosts: str, timeout: float = 15.0):
"""
Initializes the Zookeeper client and sets up state listeners.
This should be called during application startup.
:param hosts: Comma-separated list of Zookeeper host:port strings.
:param timeout: Connection timeout in seconds.
"""
if cls._client:
logger.warning("Zookeeper client already initialized. Ignoring subsequent call.")
return
logger.info(f"Initializing Zookeeper client with hosts: {hosts}")
cls._client = KazooClient(hosts=hosts, timeout=timeout)
cls._client.add_listener(cls._state_listener)
try:
cls._client.start(timeout=timeout)
logger.info("Zookeeper client started successfully.")
except Exception as e:
logger.critical(f"Failed to start Zookeeper client: {e}")
cls._client = None # Ensure client is None on failure
raise
# Register a cleanup function to be called on application exit.
atexit.register(cls.close_client)
@classmethod
def close_client(cls):
"""
Stops and closes the Zookeeper client connection.
Registered with atexit to run on graceful shutdown.
"""
if cls._client and cls._client.state != 'CLOSED':
logger.info("Closing Zookeeper client connection.")
try:
cls._client.close()
logger.info("Zookeeper client connection closed.")
except Exception as e:
logger.error(f"Error while closing Zookeeper client: {e}")
cls._client = None
@staticmethod
def _state_listener(state):
"""
Logs connection state changes. A real-world implementation might
also trigger alerts or health check failures on LOST/SUSPENDED states.
"""
if state == KazooState.LOST:
logger.error("Zookeeper session lost. Application may behave unexpectedly.")
elif state == KazooState.SUSPENDED:
logger.warning("Zookeeper connection suspended. Attempting to reconnect...")
elif state == KazooState.CONNECTED:
logger.info("Zookeeper connection established.")
# Expose a global instance for easy access within the app
zk_manager = ZookeeperManager()
This setup ensures that we initialize the client once at application startup and have a clean shutdown process. The state listener is critical for observability; if the connection is lost, we need to know immediately.
Modeling Breaker State in Zookeeper
The Zookeeper data model for the circuit breaker needs to be atomic and clear. We’ll create a parent znode for all our breakers, /circuit_breakers
. Each downstream service will have its own znode under this parent.
For a service named payment_gateway
, the structure will be:/circuit_breakers/payment_gateway
This znode will store a JSON object containing all state information. The pitfall here is trying to store different state elements in different znodes (e.g., /state
, /failures
). That approach introduces complexity and requires transactions for every read-modify-write cycle. A single znode holding a JSON blob is simpler to manage atomically.
The JSON structure:
{
"state": "CLOSED",
"failures": 0,
"last_failure_timestamp": null,
"opened_timestamp": null
}
This structure is managed by a dedicated repository class, abstracting the kazoo
calls.
app/breaker_repository.py
:
import json
import time
from kazoo.exceptions import NoNodeError, NodeExistsError
from .zookeeper_client import zk_manager
BASE_PATH = "/circuit_breakers"
class BreakerState:
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
class CircuitBreakerRepository:
"""
Handles all interactions with Zookeeper for a specific circuit breaker.
Encapsulates the znode path and data serialization.
"""
def __init__(self, service_name: str):
self.service_name = service_name
self.path = f"{BASE_PATH}/{service_name}"
self._ensure_base_path()
def _ensure_base_path(self):
client = zk_manager.get_client()
client.ensure_path(BASE_PATH)
def get_state(self) -> (dict, int):
"""
Retrieves the state and znode version for a service.
The version is crucial for performing atomic updates (CAS).
:return: A tuple of (state_dict, znode_version) or (None, None) if not found.
"""
client = zk_manager.get_client()
try:
data_bytes, stat = client.get(self.path)
state = json.loads(data_bytes.decode('utf-8'))
return state, stat.version
except NoNodeError:
return None, None
def create_initial_state(self) -> (dict, int):
"""Creates the znode with a default closed state."""
client = zk_manager.get_client()
initial_state = {
"state": BreakerState.CLOSED,
"failures": 0,
"last_failure_timestamp": None,
"opened_timestamp": None
}
data_bytes = json.dumps(initial_state).encode('utf-8')
try:
client.create(self.path, data_bytes, makepath=True)
return self.get_state()
except NodeExistsError:
# Another process created it in the meantime, which is fine.
return self.get_state()
def update_state(self, new_state: dict, version: int) -> bool:
"""
Atomically updates the znode using the Check-And-Set (CAS) pattern.
:param new_state: The new dictionary to store.
:param version: The znode version from the last read.
:return: True if the update was successful, False if a conflict occurred.
"""
client = zk_manager.get_client()
data_bytes = json.dumps(new_state).encode('utf-8')
try:
# The 'version' argument makes this a conditional set.
# It will fail if the znode version on the server is not what we provide.
client.set(self.path, data_bytes, version=version)
return True
except Exception as e:
# BadVersionError is the expected exception for a CAS failure.
# Log others for debugging.
if "BadVersion" not in str(type(e)):
logger.error(f"Unexpected error updating ZK state for {self.service_name}: {e}")
return False
The use of version
in update_state
is the most important detail here. It prevents a race condition where two instances read the state, both modify it locally, and then one overwrites the other’s changes. The second write will fail because the version has been incremented by the first write.
The Distributed Circuit Breaker Decorator
With the ZK client and repository in place, we can build the core logic: a Flask decorator. This decorator encapsulates all the state transition logic.
app/breaker_decorator.py
:
import time
import functools
import logging
from .breaker_repository import CircuitBreakerRepository, BreakerState
logger = logging.getLogger(__name__)
# --- Configuration ---
# In a real app, load this from a config file.
FAILURE_THRESHOLD = 5
OPEN_STATE_TIMEOUT_SECONDS = 60 # Time before moving from OPEN to HALF_OPEN
MAX_UPDATE_RETRIES = 3 # Retries for ZK CAS operation
class CircuitBreakerOpenError(Exception):
"""Custom exception raised when the circuit is open."""
def __init__(self, service_name):
self.service_name = service_name
super().__init__(f"Circuit breaker for service '{service_name}' is open.")
def distributed_circuit_breaker(service_name: str):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
repo = CircuitBreakerRepository(service_name)
# This is the core read-modify-write loop with retries for CAS
for _ in range(MAX_UPDATE_RETRIES):
state, version = repo.get_state()
if state is None:
state, version = repo.create_initial_state()
if state is None:
# Should not happen if ZK is healthy
raise RuntimeError(f"Could not initialize breaker state for {service_name}")
# --- State Evaluation Logic ---
current_state = state.get("state")
# Check if OPEN state has timed out and should move to HALF_OPEN
if current_state == BreakerState.OPEN:
opened_at = state.get("opened_timestamp", 0)
if time.time() - opened_at > OPEN_STATE_TIMEOUT_SECONDS:
logger.info(f"Breaker for '{service_name}' timeout expired. Moving to HALF_OPEN.")
state["state"] = BreakerState.HALF_OPEN
state["failures"] = 0
if repo.update_state(state, version):
current_state = BreakerState.HALF_OPEN
else:
# CAS failed, another instance changed it. Retry the loop.
time.sleep(0.1) # Small backoff
continue
# If still OPEN after timeout check, fail fast.
if current_state == BreakerState.OPEN:
raise CircuitBreakerOpenError(service_name)
# --- Execute the decorated function ---
try:
result = func(*args, **kwargs)
# On success, if it was HALF_OPEN, reset to CLOSED.
if current_state == BreakerState.HALF_OPEN:
logger.info(f"Successful call in HALF_OPEN state. Closing breaker for '{service_name}'.")
state["state"] = BreakerState.CLOSED
state["failures"] = 0
state["last_failure_timestamp"] = None
if repo.update_state(state, version):
return result # Update successful
else:
# CAS failed. Another process might have already closed or re-opened it.
# We still return the successful result, as our call succeeded.
# The next call will re-evaluate based on the new state.
logger.warning(f"CAS conflict closing breaker for '{service_name}'. State already updated.")
return result
return result
except Exception as e:
# --- Handle Failure ---
logger.warning(f"Call failed for service '{service_name}'. Incrementing failure count.")
state["failures"] += 1
state["last_failure_timestamp"] = time.time()
# If failure threshold is met, trip the breaker to OPEN.
if state["failures"] >= FAILURE_THRESHOLD:
logger.error(f"Failure threshold reached. Opening breaker for '{service_name}'.")
state["state"] = BreakerState.OPEN
state["opened_timestamp"] = time.time()
if repo.update_state(state, version):
raise e # Update successful, re-raise original exception
else:
logger.warning(f"CAS conflict on failure for '{service_name}'. State already updated.")
raise e # Re-raise original exception regardless of CAS result
# If we exit the loop, it means max retries were exceeded.
raise RuntimeError(f"Failed to update circuit breaker state for '{service_name}' after {MAX_UPDATE_RETRIES} retries.")
return wrapper
return decorator
This decorator is now the heart of the system. The retry loop handles the optimistic locking (CAS) failures. When a write fails due to a version mismatch, it simply retries the entire process: re-read the state, re-evaluate, and attempt to write again.
Dashboard: API and Frontend
The final piece is the visualization layer. This requires a Flask API endpoint to expose the breaker states and a simple HTML page styled with Tailwind CSS to display them.
The Flask API Blueprint
app/dashboard/routes.py
:
from flask import Blueprint, jsonify, render_template
from kazoo.exceptions import NoNodeError
from ..zookeeper_client import zk_manager
from ..breaker_repository import CircuitBreakerRepository, BASE_PATH
dashboard_bp = Blueprint('dashboard', __name__, template_folder='templates')
@dashboard_bp.route('/')
def index():
"""Serves the main dashboard HTML page."""
return render_template('dashboard.html')
@dashboard_bp.route('/api/breakers')
def get_all_breakers_status():
"""
API endpoint to get the status of all configured circuit breakers.
"""
client = zk_manager.get_client()
breakers = {}
try:
service_names = client.get_children(BASE_PATH)
for name in service_names:
repo = CircuitBreakerRepository(name)
state, _ = repo.get_state()
if state:
breakers[name] = state
except NoNodeError:
# This is fine, just means no breakers have been initialized yet.
return jsonify({})
except Exception as e:
return jsonify({"error": str(e)}), 500
return jsonify(breakers)
The Tailwind CSS Frontend
This requires a single HTML file. For simplicity in this context, we’ll use the Tailwind Play CDN. In a real project, you would set up Tailwind’s build process.
app/dashboard/templates/dashboard.html
:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF--8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Distributed Circuit Breaker Dashboard</title>
<script src="https://cdn.tailwindcss.com"></script>
</head>
<body class="bg-gray-900 text-gray-100">
<div class="container mx-auto p-8">
<h1 class="text-4xl font-bold mb-8 text-cyan-400">Circuit Breaker Status</h1>
<div id="breakers-grid" class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-6">
<!-- Breaker cards will be inserted here by JavaScript -->
</div>
<div id="loading" class="text-center text-gray-400 mt-8">
<p>Loading data...</p>
</div>
<div id="error" class="text-center text-red-500 mt-8 hidden">
<p>Failed to fetch breaker status.</p>
</div>
</div>
<script>
const grid = document.getElementById('breakers-grid');
const loadingDiv = document.getElementById('loading');
const errorDiv = document.getElementById('error');
const stateColors = {
'OPEN': 'bg-red-500 border-red-400',
'HALF_OPEN': 'bg-yellow-500 border-yellow-400',
'CLOSED': 'bg-green-600 border-green-500'
};
const stateTextColors = {
'OPEN': 'text-red-300',
'HALF_OPEN': 'text-yellow-200',
'CLOSED': 'text-green-300'
};
function createBreakerCard(name, data) {
const state = data.state;
const cardColor = stateColors[state] || 'bg-gray-700 border-gray-600';
const textColor = stateTextColors[state] || 'text-gray-300';
const lastFailure = data.last_failure_timestamp
? new Date(data.last_failure_timestamp * 1000).toLocaleString()
: 'N/A';
const openedAt = data.opened_timestamp
? new Date(data.opened_timestamp * 1000).toLocaleString()
: 'N/A';
return `
<div class="bg-gray-800 rounded-lg p-6 border-t-4 ${cardColor} shadow-lg">
<h2 class="text-2xl font-semibold mb-2">${name}</h2>
<div class="flex items-center mb-4">
<span class="text-lg font-bold ${textColor}">${state}</span>
</div>
<div class="text-sm text-gray-400 space-y-2">
<p><strong>Failures:</strong> <span class="font-mono">${data.failures}</span></p>
<p><strong>Last Failure:</strong> <span class="font-mono">${lastFailure}</span></p>
<p><strong>Opened At:</strong> <span class="font-mono">${openedAt}</span></p>
</div>
</div>
`;
}
async function fetchBreakerStatus() {
try {
const response = await fetch('/api/breakers');
if (!response.ok) {
throw new Error('Network response was not ok');
}
const data = await response.json();
loadingDiv.style.display = 'none';
errorDiv.classList.add('hidden');
grid.innerHTML = ''; // Clear previous cards
const serviceNames = Object.keys(data);
if (serviceNames.length === 0) {
grid.innerHTML = '<p class="text-gray-500 col-span-full text-center">No active circuit breakers found.</p>';
} else {
serviceNames.forEach(name => {
grid.innerHTML += createBreakerCard(name, data[name]);
});
}
} catch (err) {
console.error('Fetch error:', err);
loadingDiv.style.display = 'none';
errorDiv.classList.remove('hidden');
}
}
// Fetch data immediately and then every 5 seconds
fetchBreakerStatus();
setInterval(fetchBreakerStatus, 5000);
</script>
</body>
</html>
This simple polling mechanism provides a near real-time view. An operator can now watch this dashboard and see immediately when a downstream service becomes unhealthy and our system has protectively opened the circuit.
Putting It All Together
The application factory would look something like this:
app/__init__.py
:
from flask import Flask
from .zookeeper_client import zk_manager
def create_app(config):
app = Flask(__name__)
app.config.from_object(config)
# Initialize Zookeeper connection on startup
try:
zk_manager.init_client(hosts=app.config['ZOOKEEPER_HOSTS'])
except Exception as e:
# If ZK isn't available on startup, the app should fail fast.
raise RuntimeError(f"Could not connect to Zookeeper on startup: {e}")
# Register blueprints
from .dashboard.routes import dashboard_bp
app.register_blueprint(dashboard_bp)
# Example of a protected route
@app.route('/checkout')
@distributed_circuit_breaker(service_name='payment_gateway')
def checkout():
# This function now contains ONLY the business logic for calling the gateway.
# The resilience logic is handled by the decorator.
# ... call payment gateway ...
return "Checkout successful!"
return app
Testing Strategy
A final note on testing. Directly testing code that interacts with Zookeeper in unit tests is slow and brittle. The proper approach is to mock the CircuitBreakerRepository
class or the kazoo
client itself. For instance, using pytest
and unittest.mock
, one can create tests that assert the decorator correctly tries to read state, raises CircuitBreakerOpenError
when the mock returns an “OPEN” state, and attempts to write a new state on failure. This isolates the logic of the decorator from the networking and state management of Zookeeper.
The solution has its limitations. The transition from OPEN
to HALF_OPEN
is triggered by the first request that arrives after the timeout. If there’s no traffic, the breaker will remain open indefinitely. A more advanced implementation might involve a dedicated “janitor” process that watches for expired opened_timestamp
values in Zookeeper and proactively transitions the state. Furthermore, while Zookeeper is robust, it adds another piece of critical infrastructure to maintain. For applications running in a Kubernetes-native environment, leveraging the etcd instance that already exists or a service mesh’s built-in circuit breaking capabilities might be a more integrated, though different, architectural choice.