Implementing a Polyglot ML Model Deployment Controller Using etcd as a Declarative State Store on Azure


Our MLOps platform was fractured. The Python team deployed their Keras models via a collection of manually triggered Jenkins jobs that pushed zip archives to Azure Functions. The JavaScript team, building real-time inference features with TensorFlow.js, had their own bespoke pipeline using shell scripts to invoke Rollup for bundling and then manually updated a different set of Azure Functions. Every deployment was a context-switching burden, and rollbacks were a nightmare of tracking down previous artifacts. The core pain point was a lack of a single source of truth for what should be running. We needed a unified, declarative system, one where we could define the desired state of our entire model ecosystem and have a machine tirelessly work to make it a reality.

The concept we settled on was a custom controller, heavily inspired by the Kubernetes reconciliation loop pattern. The architecture would be simple in principle:

  1. A central, highly-available, strongly consistent data store would hold the “spec” for every model deployment. This is our desired state.
  2. A stateless controller process would continuously watch this data store for any changes to these specs.
  3. Upon detecting a change (creation, update, or deletion), the controller would trigger a series of actions—fetching code, building artifacts, and provisioning infrastructure—to align the actual state of the world with the desired state.
  4. The controller would report the status of its operations back to the data store.

For the data store, we dismissed using a standard SQL database. While possible, we needed a primitive that was explicitly designed for distributed locking and, most importantly, an efficient Watch API. etcd was the obvious choice. Its transactional semantics and watch streams are precisely what’s needed to build reliable controllers. For the deployment target, Azure Functions provided the necessary polyglot environment, supporting both the Python runtimes for Keras and the Node.js runtimes for our Rollup-bundled TF.js models.

The final architecture looked like this:

graph TD
    subgraph Control_Plane
        A[Developer CLI/UI] -- etcdctl put --> B(etcd Cluster);
        B -- Watch Stream --> C{Deployment Controller};
    end

    subgraph Build_and_Deploy
        C -- Triggers --> D{Reconciliation Logic};
        D -- Reads Spec --> B;
        D -- Fetches Source --> E[Azure Blob Storage];
        D -- Orchestrates Build --> F[Ephemeral Build Container];
        F -- Invokes --> G[Rollup for JS / Pip for Python];
        F -- Produces --> H[Deployment Artifact .zip];
        D -- Uploads Artifact --> E;
        D -- Uses Azure SDK --> I[Provision/Update Azure Function App];
        D -- Writes Status --> B;
    end

    subgraph Execution_Plane[Azure]
        I;
        J[API Gateway] --> I;
        K[End User] --> J;
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:2px

Defining the Deployment Spec in etcd

Everything starts with a well-defined API object. We stored our deployment specs as JSON objects under the etcd key prefix /models/deployments/. A typical spec for a Keras model would look like this:

etcdctl put /models/deployments/sentiment-analyzer '{"apiVersion":"v1alpha1","kind":"ModelDeployment","metadata":{"name":"sentiment-analyzer","uid":"a1b2c3d4-e5f6-7890-1234-567890abcdef"},"spec":{"type":"keras","sourceUri":"azure-blob://artifacts/models/sentiment-analyzer/v1.2.0/src.zip","entrypoint":"handler.predict","runtime":{"pythonVersion":"3.9"},"target":{"platform":"azure-function","resourceGroup":"ml-prod-rg","appName":"sentiment-analyzer-prod"}},"status":{"phase":"Pending","observedGeneration":0}}'

And for a JavaScript model using TensorFlow.js, the spec would differ slightly:

etcdctl put /models/deployments/image-classifier-js '{"apiVersion":"v1alpha1","kind":"ModelDeployment","metadata":{"name":"image-classifier-js","uid":"b2c3d4e5-f6a7-8901-2345-67890abcdef1"},"spec":{"type":"tfjs","sourceUri":"azure-blob://artifacts/models/image-classifier-js/v0.8.1/src.zip","entrypoint":"dist/bundle.handler","runtime":{"nodeVersion":"18"},"build":{"bundler":"rollup","configFile":"rollup.config.js"},"target":{"platform":"azure-function","resourceGroup":"ml-edge-rg","appName":"image-classifier-js-prod"}},"status":{"phase":"Pending","observedGeneration":0}}'

The status field is critical. It’s owned by the controller and reports the current state of the reconciliation. The observedGeneration (a concept borrowed from Kubernetes) helps us differentiate between a spec that the controller has seen and processed versus one that is new or updated.

The Core Controller Reconciliation Loop

We chose Go for implementing the controller due to its excellent concurrency support, static typing, and the quality of the official etcd and azure-sdk-for-go libraries. The heart of the controller is a simple loop: connect to etcd, establish a watch on our key prefix, and process events as they arrive.

// main.go
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"go.etcd.io/etcd/client/v3"
)

func main() {
	endpoints := []string{"localhost:2379"} // In production, this comes from config
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer cli.Close()

	log.Println("Successfully connected to etcd.")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Handle graceful shutdown
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigc
		log.Println("Shutdown signal received, cancelling context...")
		cancel()
	}()

	// The controller manages the reconciliation loop.
	controller := NewController(cli)
	err = controller.Run(ctx)
	if err != nil && err != context.Canceled {
		log.Fatalf("Controller exited with error: %v", err)
	}

	log.Println("Controller shut down gracefully.")
}

// controller.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	"go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/mvcc/mvccpb"
)

const deploymentPrefix = "/models/deployments/"

type Controller struct {
	etcdClient *clientv3.Client
	// In a real application, you'd have an Azure client, blob storage client, etc.
}

func NewController(cli *clientv3.Client) *Controller {
	return &Controller{etcdClient: cli}
}

func (c *Controller) Run(ctx context.Context) error {
	log.Println("Starting controller loop...")

	// Initial load to process existing deployments
	resp, err := c.etcdClient.Get(ctx, deploymentPrefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}
	for _, kv := range resp.Kvs {
		c.processEvent(ctx, kv)
	}
	
	// Start watching for future changes from the revision after our initial GET
	watchChan := c.etcdClient.Watch(ctx, deploymentPrefix, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))

	for {
		select {
		case <-ctx.Done():
			log.Println("Context cancelled, stopping watch loop.")
			return ctx.Err()
		case watchResp := <-watchChan:
			if err := watchResp.Err(); err != nil {
				log.Printf("Watch error: %v. Re-establishing watch...", err)
				// Basic retry logic. A production system needs more robust backoff.
				time.Sleep(5 * time.Second)
				watchChan = c.etcdClient.Watch(ctx, deploymentPrefix, clientv3.WithPrefix())
				continue
			}
			for _, event := range watchResp.Events {
				// We only care about PUT events (creations/updates).
				// Deletions would trigger a separate teardown logic path.
				if event.Type == mvccpb.PUT {
					c.processEvent(ctx, event.Kv)
				}
			}
		}
	}
}

func (c *Controller) processEvent(ctx context.Context, kv *mvccpb.KeyValue) {
	var deployment ModelDeployment
	if err := json.Unmarshal(kv.Value, &deployment); err != nil {
		log.Printf("Failed to unmarshal deployment spec for key %s: %v", string(kv.Key), err)
		return
	}

	// Simple check to avoid reprocessing the same generation
	if deployment.Metadata.Generation == deployment.Status.ObservedGeneration && deployment.Status.Phase == "Succeeded" {
		return
	}

	log.Printf("Processing deployment: %s (Type: %s)", deployment.Metadata.Name, deployment.Spec.Type)
	
	// Delegate to a specific reconciler based on the model type
	var err error
	switch deployment.Spec.Type {
	case "keras":
		err = c.reconcileKerasDeployment(ctx, &deployment)
	case "tfjs":
		err = c.reconcileTfjsDeployment(ctx, &deployment)
	default:
		log.Printf("Unknown deployment type: %s", deployment.Spec.Type)
		// Update status to Failed
		deployment.Status.Phase = "Failed"
		deployment.Status.Message = "Unknown deployment type"
		c.updateStatus(ctx, &deployment)
		return
	}
	
	if err != nil {
		log.Printf("Reconciliation failed for %s: %v", deployment.Metadata.Name, err)
		deployment.Status.Phase = "Failed"
		deployment.Status.Message = err.Error()
	} else {
		log.Printf("Reconciliation succeeded for %s", deployment.Metadata.Name)
		deployment.Status.Phase = "Succeeded"
		deployment.Status.Message = "Deployment is active and healthy."
	}
	
	// This is critical: we record that we have processed this version of the spec.
	deployment.Status.ObservedGeneration = deployment.Metadata.Generation
	c.updateStatus(ctx, &deployment)
}

// A placeholder for the actual reconciler functions
func (c *Controller) reconcileKerasDeployment(ctx context.Context, d *ModelDeployment) error {
	log.Printf("Reconciling Keras model %s...", d.Metadata.Name)
	// 1. Download source from d.Spec.SourceUri
	// 2. Package for Azure Function (Python)
	// 3. Deploy/Update Azure Function App
	time.Sleep(2 * time.Second) // Simulate work
	return nil
}

func (c *Controller) reconcileTfjsDeployment(ctx context.Context, d *ModelDeployment) error {
	log.Printf("Reconciling TF.js model %s...", d.Metadata.Name)
	// 1. Download source from d.Spec.SourceUri
	// 2. Run Rollup build in a container
	// 3. Package for Azure Function (Node.js)
	// 4. Deploy/Update Azure Function App
	time.Sleep(3 * time.Second) // Simulate work
	return nil
}

// ... updateStatus function will be shown next ...

// types.go (for clarity)
type ModelDeployment struct {
	ApiVersion string   `json:"apiVersion"`
	Kind       string   `json:"kind"`
	Metadata   Metadata `json:"metadata"`
	Spec       Spec     `json:"spec"`
	Status     Status   `json:"status"`
}

type Metadata struct {
	Name       string `json:"name"`
	UID        string `json:"uid"`
	Generation int64  `json:"generation"` // This should be incremented by the client on each spec update
}
// ... other structs for Spec and Status

A major pitfall we encountered early on was managing state updates. If multiple controller instances were running for high availability, they could race to update the status of a deployment in etcd, leading to stale writes. The solution is to use etcd’s transactional capabilities for optimistic locking. The updateStatus function must read the current object, check its version, and then perform a Compare-and-Swap (CAS) operation.

// controller.go (continued)
func (c *Controller) updateStatus(ctx context.Context, d *ModelDeployment) {
	key := deploymentPrefix + d.Metadata.Name
	
	// Get the current version of the object to perform a CAS
	getResp, err := c.etcdClient.Get(ctx, key)
	if err != nil {
		log.Printf("Failed to get current deployment for status update on %s: %v", d.Metadata.Name, err)
		return
	}
	if len(getResp.Kvs) == 0 {
		log.Printf("Cannot update status, deployment %s not found.", d.Metadata.Name)
		return
	}
	
	currentKv := getResp.Kvs[0]
	var currentDeployment ModelDeployment
	if err := json.Unmarshal(currentKv.Value, &currentDeployment); err != nil {
		log.Printf("Failed to unmarshal current deployment for status update on %s: %v", d.Metadata.Name, err)
		return
	}
	
	// Update the status on the object we just fetched
	currentDeployment.Status = d.Status
	
	newVal, err := json.Marshal(currentDeployment)
	if err != nil {
		log.Printf("Failed to marshal updated deployment for %s: %v", d.Metadata.Name, err)
		return
	}

	// Use a transaction to ensure we are updating the version we just read
	txnResp, err := c.etcdClient.Txn(ctx).
		If(clientv3.Compare(clientv3.ModRevision(key), "=", currentKv.ModRevision)).
		Then(clientv3.OpPut(key, string(newVal))).
		Commit()

	if err != nil {
		log.Printf("etcd transaction failed for %s: %v", d.Metadata.Name, err)
	} else if !txnResp.Succeeded {
		log.Printf("Optimistic lock failed for %s. Another process likely updated it. Will retry on next reconciliation loop.", d.Metadata.Name)
	}
}

This transactional update prevents race conditions and ensures the integrity of our status reporting.

The TF.js Reconciler with a Containerized Rollup Build

The truly novel part of this system was handling the tfjs models. The controller itself is a lean Go binary; it cannot and should not have Node.js, npm, or Rollup installed. The only clean way to handle this is to orchestrate a containerized build. When the reconcileTfjsDeployment function is called, it performs these steps:

  1. Downloads the model’s source code from Azure Blob Storage to a temporary local directory.
  2. Creates a Dockerfile on the fly designed for the build.
  3. Uses the Docker SDK to build an image from that Dockerfile and run a container.
  4. The container runs npm install and then npx rollup -c.
  5. The controller copies the resulting build artifact (e.g., a dist directory) out of the container.
  6. The temporary directory, container, and image are cleaned up.

Here is the code that orchestrates this, assuming the presence of a Docker daemon.

// tfjs_reconciler.go
package main

import (
    "archive/zip"
    "bytes"
    "context"
    "fmt"
    "io"
    "log"
    "os"
    "path/filepath"

    "github.com/docker/docker/api/types"
    "github.com/docker/docker/api/types/container"
    "github.com/docker/docker/client"
    "github.com/docker/docker/pkg/archive"
)

// This would be part of the Controller struct
type TfjsReconciler struct {
	dockerClient *client.Client
	// ... other clients
}

func (r *TfjsReconciler) Reconcile(ctx context.Context, d *ModelDeployment) error {
	log.Printf("[TF.js Reconciler] Starting build for %s", d.Metadata.Name)

	// Step 1: Prepare a temporary build directory
	buildDir, err := os.MkdirTemp("", "tfjs-build-*")
	if err != nil {
		return fmt.Errorf("failed to create build dir: %w", err)
	}
	defer os.RemoveAll(buildDir)
	log.Printf("[TF.js Reconciler] Using build directory: %s", buildDir)

	// In a real implementation, this would download from d.Spec.SourceUri
	if err := createFakeSourceFiles(buildDir, d.Spec.Build.ConfigFile); err != nil {
		return fmt.Errorf("failed to create fake source: %w", err)
	}
	
	// Step 2: Build the Docker image for the build environment
	dockerfileContent := fmt.Sprintf(`
FROM node:%s-slim
WORKDIR /app
COPY . .
RUN npm install
CMD ["npx", "rollup", "-c"]
`, d.Spec.Runtime.NodeVersion)

	dockerfilePath := filepath.Join(buildDir, "Dockerfile")
	if err := os.WriteFile(dockerfilePath, []byte(dockerfileContent), 0644); err != nil {
		return fmt.Errorf("failed to write Dockerfile: %w", err)
	}

	buildCtx, err := archive.TarWithOptions(buildDir, &archive.TarOptions{})
	if err != nil {
		return fmt.Errorf("failed to create tar from build context: %w", err)
	}
	
	imageName := fmt.Sprintf("builder-%s:latest", d.Metadata.Name)
	buildResp, err := r.dockerClient.ImageBuild(ctx, buildCtx, types.ImageBuildOptions{
		Tags:       []string{imageName},
		Dockerfile: "Dockerfile",
		Remove:     true, // Remove intermediate containers
		ForceRemove:true,
	})
	if err != nil {
		return fmt.Errorf("failed to build image: %w", err)
	}
	defer buildResp.Body.Close()
	// In a production system, you must stream and log this output.
	io.Copy(io.Discard, buildResp.Body) 
	
	log.Printf("[TF.js Reconciler] Successfully built image: %s", imageName)

	// Step 3: Run the build container
	contResp, err := r.dockerClient.ContainerCreate(ctx, &container.Config{
		Image: imageName,
	}, nil, nil, nil, "")
	if err != nil {
		return fmt.Errorf("failed to create container: %w", err)
	}
	defer r.dockerClient.ContainerRemove(ctx, contResp.ID, types.ContainerRemoveOptions{Force: true})

	if err := r.dockerClient.ContainerStart(ctx, contResp.ID, types.ContainerStartOptions{}); err != nil {
		return fmt.Errorf("failed to start container: %w", err)
	}

	statusCh, errCh := r.dockerClient.ContainerWait(ctx, contResp.ID, container.WaitConditionNotRunning)
	select {
	case err := <-errCh:
		if err != nil {
			return fmt.Errorf("container wait error: %w", err)
		}
	case status := <-statusCh:
		if status.StatusCode != 0 {
			// Get logs on failure
			logReader, logErr := r.dockerClient.ContainerLogs(ctx, contResp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
			if logErr == nil {
				buf := new(bytes.Buffer)
				buf.ReadFrom(logReader)
				log.Printf("Build container logs for %s:\n%s", d.Metadata.Name, buf.String())
			}
			return fmt.Errorf("build container exited with non-zero status code: %d", status.StatusCode)
		}
	}
	log.Printf("[TF.js Reconciler] Build container finished successfully.")

	// Step 4: Extract artifact
	reader, _, err := r.dockerClient.CopyFromContainer(ctx, contResp.ID, "/app/dist")
	if err != nil {
		return fmt.Errorf("failed to copy artifact from container: %w", err)
	}
	defer reader.Close()

	// The result is a tar stream, which we need to extract.
	// Logic to untar the artifact stream into a final location omitted for brevity.
	log.Println("[TF.js Reconciler] Artifact extracted.")

	// Step 5: Package and deploy to Azure
	// This would involve zipping the final artifact and using the Azure SDK.
	log.Println("[TF.js Reconciler] Deployment to Azure would happen here.")
	
	return nil
}


// Helper to simulate project files
func createFakeSourceFiles(dir, rollupConfigName string) error {
	pkgJson := `{"dependencies": {"rollup": "^3.0.0"}}`
	rollupConf := `export default { input: 'index.js', output: { dir: 'dist', format: 'cjs' } };`
	indexJs := `console.log("Hello from bundled model code");`
	
	os.WriteFile(filepath.Join(dir, "package.json"), []byte(pkgJson), 0644)
	os.WriteFile(filepath.Join(dir, rollupConfigName), []byte(rollupConf), 0644)
	os.WriteFile(filepath.Join(dir, "index.js"), []byte(indexJs), 0644)
	return nil
}

This approach isolates the build environment completely, ensuring that the controller remains lightweight and that builds are hermetic and reproducible. The cost is the overhead of Docker operations, but for a deployment process, this is an acceptable trade-off for the gain in reliability and maintainability.

The final phase involves taking the built artifact—whether it’s the Python code for a Keras model or the bundled JavaScript for a TF.js model—packaging it into a zip file structured correctly for Azure Functions, and using the azure-sdk-for-go to upload it and trigger the deployment. The logic is largely boilerplate Azure API calls.

This system, born from operational pain, ultimately unified our model deployment strategy. The key was establishing a declarative source of truth in etcd and building a robust reconciliation engine that could handle the specific build requirements of our polyglot ML environment. The use of Rollup was not just for a UI, but as an integral part of our backend MLOps pipeline, orchestrated by a Go controller.

The current implementation, however, is not without its limitations. It lacks a leader election mechanism, meaning it can only be run as a single instance, which is a single point of failure. A production-ready version would leverage etcd’s distributed locking primitives to elect a leader among multiple controller instances. Furthermore, the build orchestration is sequential; a more advanced version could parallelize builds for multiple deployments. The error reporting is also basic; surfacing detailed build logs from the container back into the status field in etcd would be a necessary improvement for debugging failed deployments.


  TOC