The operational friction in deploying machine learning models into production is a familiar pain point. A data science team finalizes a new model version, fraud-detection-v1.2
, which promises a 5% improvement. The platform team’s responsibility is to roll it out, but the process is fraught with manual steps: updating a Kubernetes Deployment
manifest, tweaking an Istio VirtualService
, or modifying API gateway configuration files. Each change requires a new CI/CD pipeline run, a deployment, and a window of risk. This cycle is too slow for rapid experimentation and creates a bottleneck that stifles innovation. When you need to run three different model versions simultaneously for A/B testing, with traffic split 70/20/10, the complexity of declarative Kubernetes manifests becomes a liability, not an asset.
We hit this wall when our MLOps platform grew to support dozens of models, each with multiple versions being tested in parallel. The core problem was the static coupling between the routing logic and the infrastructure’s deployed state. Our initial concept was to decouple this entirely: create a serving architecture where model routing and traffic splitting decisions could be made and propagated in near real-time, without touching a single line of YAML
or triggering a pod restart.
The architecture we settled on leverages a central, dynamic GraphQL Gateway. This gateway acts as the single entry point for all model inference requests. Its intelligence, however, doesn’t reside in its own configuration files. Instead, it subscribes to a distributed configuration center—Nacos—for its routing table. The ML models themselves are deployed as independent microservices on a GKE cluster, each exposing a GraphQL schema. The gateway uses Apollo Federation to compose these downstream schemas into a unified graph.
graph TD subgraph "GCP Project" subgraph "GKE Cluster" A[GraphQL Client] --> B{Dynamic Gateway Deployment}; subgraph "Model Services" D[fraud-detection-v1 Service] --> E[fraud-detection-v1 Pod]; F[fraud-detection-v2 Service] --> G[fraud-detection-v2 Pod]; H[recommendation-v3 Service] --> I[recommendation-v3 Pod]; end B -->|Traffic Routing based on Nacos Config| D; B -->|Traffic Routing based on Nacos Config| F; B -->|Traffic Routing based on Nacos Config| H; end subgraph "Nacos Cluster" C[Nacos Server]; end end B -- "Subscribes to Config Changes" --> C; J[MLOps CI/CD] -- "Publishes New Config" --> C; style B fill:#f9f,stroke:#333,stroke-width:2px style C fill:#ccf,stroke:#333,stroke-width:2px
The technology selection process was driven by pragmatism.
- GCP GKE: A managed Kubernetes environment was non-negotiable. We needed robust autoscaling, integration with Google Cloud’s monitoring suite, and Workload Identity for secure access to other GCP services without managing service account keys. GKE provides this out of the box.
- Nacos: We evaluated Kubernetes
ConfigMaps
and commercial products like Consul.ConfigMaps
require a pod restart or a manual sidecar to reload configuration, defeating our “no-restart” goal. While Consul is powerful, Nacos offered a simpler client SDK and a push-based model for configuration updates that was a perfect fit. When a value changes in the Nacos UI or via its API, all subscribed clients are notified within seconds. In a real-world project, this immediate propagation is critical for emergency rollbacks or rapid A/B test adjustments. - GraphQL with Apollo Federation: Why not a simple REST gateway? As our model portfolio grew, frontend teams and other services needed to fetch predictions from multiple models in a single trip. A REST approach would lead to endpoint explosion (
/predict/modelA
,/predict/modelB
) or chatty API calls. GraphQL provides a unified query language, and Apollo Federation allows each model-serving microservice to own its part of the schema, which the gateway then composes. This aligns perfectly with a microservices approach where each model team can evolve their API independently.
The core of this system is the implementation. Let’s break it down piece by piece, focusing on production-grade code.
1. Defining the Configuration Schema in Nacos
Everything starts with a well-defined contract for our model routing rules. We store this as a single YAML configuration object in Nacos. A common mistake is to create hundreds of individual keys; this makes atomic updates impossible. Instead, a single document representing the entire routing state is more robust.
Nacos Configuration:
- Data ID:
ml-model-routing
- Group:
DEFAULT_GROUP
- Format: YAML
# Nacos Data ID: ml-model-routing
models:
fraudDetection:
# The base name for the federated service in the gateway
federatedServiceName: "fraudDetection"
# Default version to use if no other rules match
defaultVersion: "v1"
# List of available versions with their target k8s service and traffic weight
versions:
- name: "v1"
# Internal GKE DNS name. This is crucial for service discovery.
serviceUrl: "http://fraud-detection-v1.default.svc.cluster.local:4001/graphql"
weight: 90 # Traffic percentage
status: "ACTIVE"
- name: "v2"
serviceUrl: "http://fraud-detection-v2.default.svc.cluster.local:4001/graphql"
weight: 10 # Canary release getting 10% of traffic
status: "ACTIVE"
productRecommendation:
federatedServiceName: "productRecommendation"
defaultVersion: "v3.1"
versions:
- name: "v3.1"
serviceUrl: "http://recommendation-v3-1.default.svc.cluster.local:4002/graphql"
weight: 100
status: "ACTIVE"
- name: "v3.0"
serviceUrl: "http://recommendation-v3-0.default.svc.cluster.local:4002/graphql"
weight: 0
status: "INACTIVE" # This version is deployed but receives no traffic
This structure allows us to define traffic splits, canary versions, and even quickly deactivate a faulty model version by setting its weight to 0, all through a simple change in Nacos.
2. The Model-Serving Microservice
Next, we need the actual model servers. These are simple applications that expose a GraphQL endpoint. For this example, we’ll use Python with FastAPI and ariadne
. In a real MLOps pipeline, this service would be built automatically, packaging a trained model artifact (like a .pkl
or .onnx
file) into the container image.
Here’s a simplified fraud-detection-v1
service. The v2
service would be identical, just returning a different version string.
model_server.py
:
import os
import uvicorn
import logging
from fastapi import FastAPI
from ariadne import QueryType, make_executable_schema, gql
from ariadne.asgi import GraphQL
# Basic structured logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# The model version is injected via an environment variable in the Kubernetes deployment.
# This is a best practice to avoid hardcoding configuration.
MODEL_VERSION = os.getenv("MODEL_VERSION", "unknown")
# Define GraphQL schema. In a federated setup, we extend the base Query type
# and define entity resolvers, but for simplicity, we'll use a basic schema.
type_defs = gql(f"""
type Query {{
# Simulates a fraud prediction request
predictFraud(transactionId: String!): PredictionResult
}}
type PredictionResult {{
transactionId: String!
fraudScore: Float!
modelVersion: String!
decision: String!
}}
""")
query = QueryType()
@query.field("predictFraud")
def resolve_predict_fraud(_, info, transactionId: str):
logger.info(f"Processing prediction for transactionId: {transactionId} with model version: {MODEL_VERSION}")
# In a real application, this is where you would load your model
# and perform inference. We simulate it here.
try:
score = 0.1 # Dummy score for v1
if MODEL_VERSION == "v2":
score = 0.8 # Dummy score for v2 to show a difference
return {
"transactionId": transactionId,
"fraudScore": score,
"modelVersion": MODEL_VERSION,
"decision": "APPROVE" if score < 0.5 else "REJECT"
}
except Exception as e:
logger.error(f"Inference failed for transactionId: {transactionId}", exc_info=True)
# It's critical to return a structured error that GraphQL can handle
# instead of letting the server crash.
# Ariadne handles this by default, but custom error formatting is possible.
raise e
schema = make_executable_schema(type_defs, query)
app = FastAPI()
app.add_route("/graphql", GraphQL(schema, debug=True))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=4001)
The corresponding Kubernetes manifests are straightforward. We create a Deployment
and a Service
for each model version.
fraud-detection-v1-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: fraud-detection-v1
spec:
replicas: 2
selector:
matchLabels:
app: fraud-detection
version: v1
template:
metadata:
labels:
app: fraud-detection
version: v1
spec:
containers:
- name: model-server
image: your-repo/fraud-detection-server:1.0.0 # Replace with your image
ports:
- containerPort: 4001
env:
- name: MODEL_VERSION
value: "v1"
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "500m"
memory: "1Gi"
---
apiVersion: v1
kind: Service
metadata:
name: fraud-detection-v1
spec:
selector:
app: fraud-detection
version: v1
ports:
- protocol: TCP
port: 4001
targetPort: 4001
The manifest for v2
would be identical, just with v1
replaced by v2
and pointing to the v2
container image.
3. The Dynamic GraphQL Gateway
This is the heart of the system. It’s a Node.js application using Apollo Server. It maintains an in-memory representation of the Nacos configuration and uses a custom gateway
implementation to route requests.
Key Dependencies:
-
@apollo/server
,@apollo/gateway
: For the core GraphQL gateway functionality. -
nacos
: The official Node.js client for Nacos. -
js-yaml
: To parse the YAML configuration from Nacos. -
winston
: For production-grade logging.
gateway.js
:
const { ApolloServer } = require('@apollo/server');
const { startStandaloneServer } = require('@apollo/server/standalone');
const { ApolloGateway, IntrospectAndCompose, RemoteGraphQLDataSource } = require('@apollo/gateway');
const { NacosConfigClient } = require('nacos');
const yaml = require('js-yaml');
const winston = require('winston');
// Setup structured logging
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [new winston.transports.Console()],
});
// --- Nacos Configuration ---
const NACOS_SERVER_ADDR = process.env.NACOS_SERVER_ADDR || '127.0.0.1:8848';
const NACOS_DATA_ID = 'ml-model-routing';
const NACOS_GROUP = 'DEFAULT_GROUP';
// In-memory cache for our routing configuration.
// It's critical to initialize this with a safe default in case Nacos is unavailable on startup.
let routingConfig = { models: {} };
class DynamicRoutingDataSource extends RemoteGraphQLDataSource {
// This method is called by the gateway before sending a request to a downstream service.
// We intercept it to inject the correct URL based on our dynamic config.
willSendRequest({ request, context }) {
if (!context.modelRouting) {
// This should not happen if the context is built correctly.
logger.warn('modelRouting context is missing');
return;
}
const { targetServiceName, targetUrl } = context.modelRouting;
// A common pitfall is not logging the routing decision. For debugging, this is invaluable.
logger.info(`Routing request for service "${targetServiceName}" to URL: ${targetUrl}`);
// Modify the request URL to point to the correct model version service.
request.http.url = targetUrl;
}
}
// Function to resolve which version to use based on traffic weights.
function resolveVersion(modelConfig) {
const activeVersions = modelConfig.versions.filter(v => v.status === 'ACTIVE' && v.weight > 0);
if (activeVersions.length === 0) {
return null; // No active version available
}
const randomNumber = Math.random() * 100;
let cumulativeWeight = 0;
for (const version of activeVersions) {
cumulativeWeight += version.weight;
if (randomNumber < cumulativeWeight) {
return version;
}
}
// Fallback to the last active version in case of floating point inaccuracies.
return activeVersions[activeVersions.length - 1];
}
const gateway = new ApolloGateway({
supergraphSdl: new IntrospectAndCompose({
// The list of subgraphs is now dynamic, based on our Nacos config.
subgraphs: [],
}),
buildService({ name, url }) {
return new DynamicRoutingDataSource({ url });
},
});
const server = new ApolloServer({
gateway,
// We disable subscriptions as this is a simple query/mutation gateway.
subscriptions: false,
});
async function initializeNacosListener() {
const client = new NacosConfigClient({
serverAddr: NACOS_SERVER_ADDR,
});
// Function to fetch and update the config
const updateConfig = async () => {
try {
const configStr = await client.getConfig(NACOS_DATA_ID, NACOS_GROUP);
if (!configStr) {
logger.warn(`Config is empty in Nacos for ${NACOS_DATA_ID}. Using cached version.`);
return;
}
const newConfig = yaml.load(configStr);
routingConfig = newConfig;
// This is the most critical part: we must tell the Apollo Gateway to reload its schema
// when the list of available services changes.
const subgraphs = Object.values(routingConfig.models).map(model => ({
name: model.federatedServiceName,
// The URL here is a placeholder. The actual routing happens in DynamicRoutingDataSource.
url: model.versions[0].serviceUrl
}));
// Reload the gateway with the new configuration
await gateway.load({
apollo: {
key: process.env.APOLLO_KEY,
graphRef: process.env.APOLLO_GRAPH_REF,
},
supergraphSdl: new IntrospectAndCompose({subgraphs})
});
logger.info('Successfully updated routing config and reloaded gateway schema from Nacos.');
} catch (err) {
logger.error('Failed to fetch or process config from Nacos.', { error: err.message });
// In a real-world project, you'd add retry logic and alerts here.
}
};
// Initial fetch
await updateConfig();
// Subscribe to future changes
client.subscribe({
dataId: NACOS_DATA_ID,
group: NACOS_GROUP,
}, (configStr) => {
logger.info('Nacos config changed, triggering update.');
updateConfig();
});
}
async function startServer() {
await initializeNacosListener();
const { url } = await startStandaloneServer(server, {
listen: { port: 4000 },
// The context function is executed for every incoming request.
// This is where we make our dynamic routing decision.
context: async ({ req }) => {
const serviceName = req.body?.operationName; // This is a simplification. A more robust way is to parse the query AST.
const modelName = req.body?.query.includes('predictFraud') ? 'fraudDetection' : null; // A hacky way to know which model is being queried
if (modelName && routingConfig.models[modelName]) {
const modelConfig = routingConfig.models[modelName];
const resolvedVersion = resolveVersion(modelConfig);
if (resolvedVersion) {
return {
modelRouting: {
targetServiceName: modelConfig.federatedServiceName,
targetUrl: resolvedVersion.serviceUrl,
},
};
}
}
return {};
},
});
logger.info(`🚀 Gateway ready at ${url}`);
}
startServer().catch(err => logger.error('Failed to start server', { error: err }));
The logic here is subtle but powerful. The ApolloGateway
is initialized with a dummy list of subgraphs. The initializeNacosListener
fetches the real configuration and then calls gateway.load()
, which forces Apollo Gateway to refetch the schemas from the downstream services defined in Nacos and compose the new supergraph. The Nacos client’s subscribe
method ensures this happens automatically whenever the config is changed. The DynamicRoutingDataSource
and context
function work together at request time to select a specific version of a model based on the weighted routing rules.
4. Client Interaction
The consuming client’s implementation is now trivial. It interacts with a single, stable GraphQL endpoint and is completely oblivious to the versioning and canary testing happening on the backend.
client_test.py
:
import requests
import time
import collections
# pip install requests
GATEWAY_URL = "http://localhost:4000/graphql" # Or the GKE service external IP
QUERY = """
query FraudPrediction($tid: String!) {
predictFraud(transactionId: $tid) {
modelVersion
fraudScore
decision
}
}
"""
def run_test(num_requests=100):
version_counts = collections.defaultdict(int)
print(f"Sending {num_requests} requests...")
for i in range(num_requests):
try:
response = requests.post(
GATEWAY_URL,
json={"query": QUERY, "variables": {"tid": f"txn_{i}"}}
)
response.raise_for_status()
data = response.json()
if "errors" in data:
print(f"GraphQL Error: {data['errors']}")
continue
version = data["data"]["predictFraud"]["modelVersion"]
version_counts[version] += 1
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
time.sleep(0.05)
print("\n--- Results ---")
for version, count in version_counts.items():
percentage = (count / num_requests) * 100
print(f"Version {version}: {count} requests ({percentage:.2f}%)")
print("---------------")
if __name__ == "__main__":
# Initially, Nacos config is 90% v1, 10% v2
print("Running test with 90/10 split...")
run_test()
# Now, imagine an operator goes into the Nacos UI and changes the weights
# to v1: 50, v2: 50 and hits "Publish".
# The gateway will pick up the change within seconds.
input("\n>>> Go to Nacos, change fraudDetection weights to 50/50, then press Enter to continue...")
print("\nRunning test with 50/50 split...")
run_test()
When you run this, you will see the traffic distribution shift in real-time after updating the Nacos configuration, without any deployments, restarts, or downtime. This is the tangible result of the decoupled architecture. It transforms MLOps from a slow, infrastructure-centric process to a fast, application-centric one.
This architecture, while powerful, is not without its limitations and required maturity. The gateway itself is a critical component and must be made highly available with multiple replicas and a Horizontal Pod Autoscaler. The logic for parsing the incoming GraphQL query to determine which model is being targeted was simplified here; a production implementation would need a more robust abstract syntax tree (AST) parser. Furthermore, the reliance on Nacos introduces another stateful system that needs to be managed, monitored, and backed up. The primary applicability boundary for this pattern is organizational scale: for a team managing only one or two models, the complexity might outweigh the benefits. However, for a platform serving a diverse and rapidly evolving portfolio of machine learning models, this dynamic, configuration-driven approach provides a level of agility that is difficult to achieve with static infrastructure definitions alone. Future iterations could involve integrating this with a feature store, where the routing could be determined not just by random weight but by specific feature values in the request, enabling even more sophisticated targeted rollouts.