Constructing a Dynamic MLOps Edge Architecture Using a Dart-based xDS Control Plane for Envoy Proxy and MLflow Integration


The challenge is deploying machine learning models to a fleet of edge devices with requirements for immediate, controlled rollouts and high observability, without necessitating a full device firmware update. A centralized inference API introduces unacceptable latency and network dependency. Conversely, baking models into device software creates a rigid, slow-to-iterate deployment cycle, making canary releases or A/B testing of model versions nearly impossible. The operational friction in this scenario is the primary bottleneck to improving model performance in the field.

A common approach involves pushing model files to devices and restarting a local inference server. This is brittle. It often leads to service downtime during updates and lacks fine-grained traffic control. If a new model version, v2, proves to be problematic, rolling back to v1 requires another file push and service restart across the entire fleet, a process fraught with potential failures. A robust solution must separate the deployment of model artifacts from the routing of inference traffic.

The alternative architecture pursued here decouples these concerns by introducing a dynamic configuration plane. It treats edge model servers as network services that can be managed by a sophisticated local proxy. This approach allows for instantaneous, zero-downtime traffic shifting between model versions. The problem then becomes one of dynamic configuration management at scale for a distributed fleet of proxies. This is fundamentally a control plane problem.

The chosen architecture consists of four key components:

  1. MLflow: Acts as the central model registry and source of truth for model lifecycle management. Its responsibility ends once a model artifact is approved for production.
  2. Envoy Proxy: Deployed on each edge device as the data plane. It intercepts all incoming inference requests and routes them to one or more local model-serving processes based on dynamic configuration it receives from a control plane.
  3. Dart Control Plane: A custom-built server, implemented in Dart, that serves as the xDS (Discovery Service) control plane for the Envoy fleet. It queries MLflow for production-ready models and translates this state into Envoy configuration, pushing updates to edge devices in real-time.
  4. Flutter/Valtio Monitoring Dashboard: A web-based administrative dashboard, built with Flutter Web, for observing the state of the edge fleet. A critical component of this dashboard uses Valtio, a JavaScript state management library, to handle the high-frequency, complex state updates reflecting real-time traffic splits and model performance metrics, demonstrating a pattern for integrating specialized JS libraries into a Dart-based frontend when necessary.

This design offers the low latency of on-device inference while providing the flexibility of a cloud-native deployment workflow, including canary releases and centralized observability, directly at the edge. The selection of Dart for the control plane is pragmatic; its performance is more than adequate for this configuration-serving role, and it allows for code-sharing with the Flutter-based administrative frontend.

Core Implementation: The Dart xDS Control Plane

The heart of this architecture is the control plane’s ability to speak Envoy’s xDS API. Envoy’s configuration can be sourced dynamically from gRPC services. We need to implement the Cluster Discovery Service (CDS) and the Route Discovery Service (RDS) to control where and how traffic is routed.

First, the necessary Protocol Buffer definitions for Envoy’s APIs must be available to the Dart project. These can be generated from the official Envoy API repository. For a production project, this generation step would be part of the build pipeline.

Here is the core structure of the Dart xDS server using the grpc package. This implementation focuses on simplicity, using in-memory caches for configuration state that gets updated by the MLflow integration logic.

// file: lib/xds_server.dart
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';

// Assume generated protobuf classes from Envoy's API definitions are available.
// e.g., 'package:envoy_apis/envoy/service/discovery/v3/discovery.pb.dart';
// e.g., 'package:envoy_apis/envoy/config/cluster/v3/cluster.pb.dart';
// e.g., 'package:envoy_apis/envoy/config/route/v3/route.pb.dart';

import 'generated/envoy/service/discovery/v3/discovery.pbgrpc.dart';
import 'generated/envoy/config/cluster/v3/cluster.pb.dart';
import 'generated/envoy/config/route/v3/route.pb.dart';
import 'package_p./google/protobuf/any.pb.dart';

final _logger = Logger('XdsServer');

// In-memory representation of the current desired state.
// This would be updated by the MLflow polling service.
class ConfigurationState {
  String version = '0';
  List<Cluster> clusters = [];
  List<RouteConfiguration> routes = [];
}

class DiscoveryServiceV3 extends AggregatedDiscoveryServiceBase {
  final ConfigurationState _state;
  final Map<String, StreamController<DiscoveryResponse>> _watchers = {};

  DiscoveryServiceV3(this._state);

  
  Stream<DiscoveryResponse> streamAggregatedResources(
      ServiceCall call, Stream<DiscoveryRequest> requestStream) {
    final peer = call.peer;
    _logger.info('New xDS stream connection from: $peer');
    final controller = StreamController<DiscoveryResponse>();

    requestStream.listen(
      (request) {
        _handleDiscoveryRequest(request, controller, peer);
      },
      onDone: () {
        _logger.info('xDS stream from $peer closed.');
        controller.close();
        _watchers.remove(peer);
      },
      onError: (e) {
        _logger.warning('Error on xDS stream from $peer: $e');
        controller.close();
        _watchers.remove(peer);
      },
    );

    _watchers[peer] = controller;
    return controller.stream;
  }

  void _handleDiscoveryRequest(DiscoveryRequest request,
      StreamController<DiscoveryResponse> controller, String peer) {
    final typeUrl = request.typeUrl;
    final lastVersion = request.versionInfo;
    
    _logger.info(
        'Received request from $peer for type: $typeUrl, version: $lastVersion');

    // A real implementation needs more sophisticated version tracking per resource type.
    // For this example, we use a single global version.
    if (lastVersion == _state.version) {
      _logger.info('Client $peer is up to date with version ${_state.version}. No update sent.');
      // Envoy expects a response to acknowledge, even if it's empty.
      // However, to keep the connection alive without sending redundant data,
      // we can simply hold the response until the config actually changes.
      // The `notifyWatchers` method will handle pushing updates.
      return;
    }

    // New or outdated client, send the full current configuration.
    _logger.info('Client $peer has version $lastVersion, server is at ${_state.version}. Sending update.');
    
    _sendResponse(controller, typeUrl);
  }

  void _sendResponse(StreamController<DiscoveryResponse> controller, String typeUrl) {
    final resources = <Any>[];
    switch (typeUrl) {
      case 'type.googleapis.com/envoy.config.cluster.v3.Cluster':
        _state.clusters.forEach((c) => resources.add(Any.pack(c)));
        break;
      case 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration':
        _state.routes.forEach((r) => resources.add(Any.pack(r)));
        break;
      default:
        _logger.warning('Unsupported resource type requested: $typeUrl');
        // A production server must send a NACK here.
        return;
    }

    final response = DiscoveryResponse()
      ..versionInfo = _state.version
      ..typeUrl = typeUrl
      ..resources.addAll(resources);
    
    controller.add(response);
  }

  /// This method is called by an external service (e.g., the MLflow poller)
  /// when the configuration state has changed.
  void notifyWatchers() {
    _logger.info('Configuration updated to version ${_state.version}. Notifying all ${_watchers.length} clients.');
    _watchers.values.forEach((controller) {
      // In a real system, you would check which resource type the client is subscribed to.
      // For ADS, we must resend both CDS and RDS if either changes.
      _sendResponse(controller, 'type.googleapis.com/envoy.config.cluster.v3.Cluster');
      _sendResponse(controller, 'type.googleapis.com/envoy.config.route.v3.RouteConfiguration');
    });
  }
}

Future<void> main() async {
  // Setup logging
  Logger.root.level = Level.ALL;
  Logger.root.onRecord.listen((record) {
    print('${record.level.name}: ${record.time}: ${record.loggerName}: ${record.message}');
  });

  final state = ConfigurationState();
  final discoveryService = DiscoveryServiceV3(state);

  final server = Server.create(
    services: [discoveryService],
    codecRegistry: CodecRegistry(
      codecs: const [GzipCodec(), IdentityCodec()],
    ),
  );

  await server.serve(port: 18000);
  _logger.info('xDS server listening on port ${server.port}...');

  // --- Mock MLflow update loop ---
  // In a real system, this would be driven by MLflow webhooks or a REST API poller.
  Timer.periodic(const Duration(seconds: 30), (timer) {
    final newVersion = (int.parse(state.version) + 1).toString();
    _logger.info('--- Simulating MLflow model promotion. Updating config to version $newVersion ---');
    
    // Example: Canary rollout for a new model version
    // Assume model v1 runs on port 9001, v2 on port 9002
    final cluster1 = _createCluster('model_service_v1', '127.0.0.1', 9001);
    final cluster2 = _createCluster('model_service_v2', '127.0.0.1', 9002);
    
    final routeConfig = _createCanaryRoute('inference_route', 80, 20); // 80% to v1, 20% to v2

    state.version = newVersion;
    state.clusters = [cluster1, cluster2];
    state.routes = [routeConfig];
    
    discoveryService.notifyWatchers();
  });
}

// Helper functions to create Envoy config objects
Cluster _createCluster(String name, String address, int port) {
  // A simplified cluster definition. Production configs are more complex.
  return Cluster()
    ..name = name
    ..connectTimeout = Duration(seconds: 1).toPbDuration()
    ..type = Cluster_DiscoveryType.STATIC
    ..lbPolicy = Cluster_LbPolicy.ROUND_ROBIN
    ..loadAssignment = (ClusterLoadAssignment()
      ..clusterName = name
      ..endpoints.add(LbEndpoint()
        ..endpoint = (Endpoint()
          ..address = (Address()
            ..socketAddress = (SocketAddress()
              ..address = address
              ..portValue = port)))));
}

RouteConfiguration _createCanaryRoute(String routeName, int v1Weight, int v2Weight) {
  return RouteConfiguration()
    ..name = routeName
    ..virtualHosts.add(VirtualHost()
      ..name = 'local_service'
      ..domains.add('*')
      ..routes.add(Route()
        ..match = (RouteMatch()..prefix = '/infer')
        ..route = (RouteAction()
          ..weightedClusters = (WeightedCluster()
            ..totalWeight = Int32Value(value: 100)
            ..clusters.addAll([
              WeightedCluster_ClusterWeight()
                ..name = 'model_service_v1'
                ..weight = Int32Value(value: v1Weight),
              WeightedCluster_ClusterWeight()
                ..name = 'model_service_v2'
                ..weight = Int32Value(value: v2Weight),
            ])))));
}

This Dart server establishes a gRPC endpoint that implements the streamAggregatedResources method. It maintains a list of connected Envoy clients (watchers) and pushes new configuration sets whenever the internal ConfigurationState is updated. The mock update loop simulates a change detected from MLflow, which rebuilds the cluster and route configurations to perform a canary deployment (80% of traffic to model_service_v1, 20% to model_service_v2).

Envoy Proxy Bootstrap Configuration

For an Envoy proxy on an edge device to connect to this control plane, it needs a static bootstrap configuration file. Its primary purpose is to tell Envoy where to find its dynamic configuration.

# file: envoy_bootstrap.yaml
node:
  cluster: edge_device_fleet
  id: device_001 # This should be unique per device

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 10000 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          route_config:
            name: local_route
            # RDS tells Envoy to fetch the RouteConfiguration from the xDS server
            rds:
              config_source:
                api_config_source:
                  api_type: GRPC
                  transport_api_version: V3
                  grpc_services:
                    - envoy_grpc:
                        cluster_name: xds_cluster
              resource_api_version: V3
          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router

dynamic_resources:
  # CDS tells Envoy to fetch its Cluster configurations from the xDS server
  cds_config:
    resource_api_version: V3
    api_config_source:
      api_type: GRPC
      transport_api_version: V3
      grpc_services:
        - envoy_grpc:
            cluster_name: xds_cluster
  # ADS tells Envoy to use the same stream for all xDS resources
  ads_config:
    api_type: GRPC
    transport_api_version: V3
    grpc_services:
      - envoy_grpc:
          cluster_name: xds_cluster

# This cluster definition is static because Envoy needs to know
# how to connect to the control plane itself.
static_resources:
  clusters:
  - name: xds_cluster
    type: STRICT_DNS
    # This assumes the control plane is reachable at this address.
    # In a real deployment, this would be a highly available endpoint.
    load_assignment:
      cluster_name: xds_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: control-plane.internal # or IP
                port_value: 18000
    # The control plane communication MUST be secured in production.
    # This example omits TLS for brevity.
    http2_protocol_options: {}

This configuration instructs Envoy to:

  1. Listen for incoming HTTP traffic on port 10000.
  2. Not use a static route configuration, but instead fetch it via RDS from a gRPC service.
  3. Fetch its cluster definitions via CDS from the same gRPC service.
  4. Use the Aggregated Discovery Service (ADS) on a single gRPC stream for all resources.
  5. Define a static cluster named xds_cluster which points to our Dart control plane. This is the crucial bootstrap connection.

With this setup, when Envoy starts, it immediately connects to control-plane.internal:18000 and receives its initial set of clusters and routes from the Dart server. Any subsequent updates pushed by the server are applied dynamically without restarting Envoy or dropping connections.

graph TD
    subgraph Edge Device
        ClientApp -- "HTTP /infer" --> E[Envoy Proxy on :10000]
        E -- "80% traffic" --> M1[Model v1 Server on :9001]
        E -- "20% traffic" --> M2[Model v2 Server on :9002]
    end

    subgraph Control Plane
        CP[Dart xDS Server]
        MLflow[MLflow Tracking Server]
    end
    
    subgraph Admin Dashboard
        AdminUI[Flutter Web + Valtio Dashboard]
    end

    E -. "gRPC xDS Stream" .-> CP
    CP -- "Polls for new models" --> MLflow
    AdminUI -- "WebSocket for state" --> CP
    AdminUI -- "Observes" --> E

MLflow Integration and State Propagation

The missing link is the logic that translates MLflow state into the ConfigurationState object in our Dart server. This can be a simple poller or a more robust webhook handler. A pragmatic poller implementation would periodically query MLflow’s REST API for models tagged as “Production”.

// file: lib/mlflow_poller.dart
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:logging/logging.dart';
import 'xds_server.dart'; // To access ConfigurationState

final _logger = Logger('MLflowPoller');

class MLflowClient {
  final String mlflowHost;
  MLflowClient(this.mlflowHost);

  Future<List<String>> getProductionModels(String modelName) async {
    // In a real system, error handling and retries are critical.
    try {
      final response = await http.get(
        Uri.parse('$mlflowHost/api/2.0/mlflow/model-versions/search?filter=name%3D%27$modelName%27'),
      );
      if (response.statusCode == 200) {
        final data = json.decode(response.body);
        final versions = (data['model_versions'] as List)
            .where((v) => v['current_stage'] == 'Production')
            .map((v) => v['version'] as String)
            .toList();
        return versions;
      }
    } catch (e) {
      _logger.severe('Failed to fetch models from MLflow: $e');
    }
    return [];
  }
}

// This function would be called periodically to update the configuration
void pollAndUpdateConfig(MLflowClient client, ConfigurationState state, DiscoveryServiceV3 discoveryService) async {
    _logger.info("Polling MLflow for production models...");
    final prodVersions = await client.getProductionModels('edge_inference_model');
    
    // Logic to translate model versions into clusters and routes.
    // This is where business logic for canarying, A/B testing, etc., lives.
    // Example: The latest two production versions are used for a canary.
    prodVersions.sort((a, b) => int.parse(b).compareTo(int.parse(a)));
    
    if (prodVersions.length < 2) {
        _logger.warning("Not enough production models to perform canary. Needs at least 2.");
        // Default to 100% traffic to the single available model.
        return;
    }

    final primaryVersion = prodVersions[0];
    final canaryVersion = prodVersions[1];

    // A real implementation would map version numbers to deployed ports/sockets.
    // Here we assume a convention: v1 -> 9001, v2 -> 9002 etc.
    final cluster1 = _createCluster('model_service_v$primaryVersion', '127.0.0.1', 9000 + int.parse(primaryVersion));
    final cluster2 = _createCluster('model_service_v$canaryVersion', '127.0.0.1', 9000 + int.parse(canaryVersion));
    
    // Define the traffic split, could be fetched from a config service.
    final routeConfig = _createCanaryRoute('inference_route', 90, 10); // 90% to primary, 10% to canary

    final newVersion = (int.parse(state.version) + 1).toString();

    // This update must be atomic.
    state.version = newVersion;
    state.clusters = [cluster1, cluster2];
    state.routes = [routeConfig];
    
    discoveryService.notifyWatchers();
}

Client-Side Observability with Valtio

The final piece is the administrative dashboard. While most of the UI can be standard Flutter Web, a component showing the real-time traffic distribution and performance metrics (like latency percentiles per model version) requires handling high-frequency updates without re-rendering the entire page. This is a scenario where a specialized JavaScript state management library like Valtio excels due to its proxy-based reactivity.

We can use package:js to interoperate between Dart and JavaScript.

JavaScript side (using Valtio):

// file: web/dashboard.js
import { proxy, useSnapshot } from 'valtio';
import React from 'react';
import ReactDOM from 'react-dom';

// The Valtio proxy state. This is the single source of truth for the UI.
const fleetState = proxy({
  version: '0',
  trafficSplit: {
    model_v1: 100,
    model_v2: 0,
  },
  performance: {
    model_v1: { p99_latency: 0 },
    model_v2: { p99_latency: 0 },
  }
});

// A global function that Dart can call to update the state.
window.updateFleetState = (newState) => {
  const parsedState = JSON.parse(newState);
  fleetState.version = parsedState.version;
  Object.assign(fleetState.trafficSplit, parsedState.trafficSplit);
  Object.assign(fleetState.performance, parsedState.performance);
};

// A simple React component that consumes the Valtio state.
function TrafficDashboard() {
  const snap = useSnapshot(fleetState);
  
  return (
    <div>
      <h3>Live Traffic Distribution (Version: {snap.version})</h3>
      {Object.entries(snap.trafficSplit).map(([model, weight]) => (
        <div key={model}>
          <span>{model}: </span>
          <div style={{ width: '100%', backgroundColor: '#eee' }}>
            <div style={{ width: `${weight}%`, backgroundColor: 'green', height: '20px' }}>
              {weight}%
            </div>
          </div>
        </div>
      ))}
    </div>
  );
}

// Render the component into a div with a specific ID.
ReactDOM.render(<TrafficDashboard />, document.getElementById('valtio-root'));

Dart/Flutter side:
The Flutter app would manage a WebSocket connection to the control plane. When it receives a state update, it calls the JavaScript function.

// file: lib/flutter_dashboard_widget.dart
import 'dart:js' as js;
import 'dart:convert';
import 'package:flutter/material.dart';
// Assume a WebSocket service `webSocketService` is available and connected.

class FleetDashboard extends StatefulWidget {
  
  _FleetDashboardState createState() => _FleetDashboardState();
}

class _FleetDashboardState extends State<FleetDashboard> {
  
  void initState() {
    super.initState();
    // Listen to messages from the control plane
    webSocketService.stream.listen((message) {
      // The message contains the latest state, e.g., traffic split.
      // We pass this JSON string directly to our JavaScript function.
      try {
        // A common mistake is to try and manipulate JS objects directly.
        // Passing a serialized string is far more robust.
        js.context.callMethod('updateFleetState', [message]);
      } catch (e) {
        print("Error calling JS function: $e");
      }
    });
  }

  
  Widget build(BuildContext context) {
    // This widget would be responsible for creating the HTML element
    // that the React/Valtio component mounts into. This can be done
    // using HtmlElementView.
    return HtmlElementView(viewType: 'valtio-dashboard-view');
  }
}

This integration pattern, while adding complexity, is a pragmatic solution for leveraging best-in-class libraries from different ecosystems within a single application. It avoids reinventing a highly reactive state-management system in Dart when an excellent one already exists in JavaScript. The pitfall here is the serialization overhead between the Dart and JS contexts; for extremely high-frequency updates, this boundary could become a bottleneck.

This architecture, while complex, addresses the core problem of dynamic model deployment at the edge. The control plane, built in Dart, serves as the brain, translating high-level intent from MLOps tooling into low-level, resilient network configuration for a fleet of Envoy proxies. The primary limitation of this implementation is its lack of production hardening. The xDS communication channel must be secured with mTLS, the control plane needs to be highly available, and the state management must persist across restarts. Furthermore, the reliance on polling MLflow is suboptimal; a webhook-based system would provide lower latency updates. The presented model is a blueprint for a scalable system, not a turnkey solution.


  TOC