Our MLOps platform was fractured. The Python team deployed their Keras models via a collection of manually triggered Jenkins jobs that pushed zip archives to Azure Functions. The JavaScript team, building real-time inference features with TensorFlow.js, had their own bespoke pipeline using shell scripts to invoke Rollup for bundling and then manually updated a different set of Azure Functions. Every deployment was a context-switching burden, and rollbacks were a nightmare of tracking down previous artifacts. The core pain point was a lack of a single source of truth for what should be running. We needed a unified, declarative system, one where we could define the desired state of our entire model ecosystem and have a machine tirelessly work to make it a reality.
The concept we settled on was a custom controller, heavily inspired by the Kubernetes reconciliation loop pattern. The architecture would be simple in principle:
- A central, highly-available, strongly consistent data store would hold the “spec” for every model deployment. This is our desired state.
- A stateless controller process would continuously watch this data store for any changes to these specs.
- Upon detecting a change (creation, update, or deletion), the controller would trigger a series of actions—fetching code, building artifacts, and provisioning infrastructure—to align the actual state of the world with the desired state.
- The controller would report the status of its operations back to the data store.
For the data store, we dismissed using a standard SQL database. While possible, we needed a primitive that was explicitly designed for distributed locking and, most importantly, an efficient Watch
API. etcd was the obvious choice. Its transactional semantics and watch streams are precisely what’s needed to build reliable controllers. For the deployment target, Azure Functions provided the necessary polyglot environment, supporting both the Python runtimes for Keras and the Node.js runtimes for our Rollup-bundled TF.js models.
The final architecture looked like this:
graph TD subgraph Control_Plane A[Developer CLI/UI] -- etcdctl put --> B(etcd Cluster); B -- Watch Stream --> C{Deployment Controller}; end subgraph Build_and_Deploy C -- Triggers --> D{Reconciliation Logic}; D -- Reads Spec --> B; D -- Fetches Source --> E[Azure Blob Storage]; D -- Orchestrates Build --> F[Ephemeral Build Container]; F -- Invokes --> G[Rollup for JS / Pip for Python]; F -- Produces --> H[Deployment Artifact .zip]; D -- Uploads Artifact --> E; D -- Uses Azure SDK --> I[Provision/Update Azure Function App]; D -- Writes Status --> B; end subgraph Execution_Plane[Azure] I; J[API Gateway] --> I; K[End User] --> J; end style C fill:#f9f,stroke:#333,stroke-width:2px style B fill:#bbf,stroke:#333,stroke-width:2px
Defining the Deployment Spec in etcd
Everything starts with a well-defined API object. We stored our deployment specs as JSON objects under the etcd key prefix /models/deployments/
. A typical spec for a Keras model would look like this:
etcdctl put /models/deployments/sentiment-analyzer '{"apiVersion":"v1alpha1","kind":"ModelDeployment","metadata":{"name":"sentiment-analyzer","uid":"a1b2c3d4-e5f6-7890-1234-567890abcdef"},"spec":{"type":"keras","sourceUri":"azure-blob://artifacts/models/sentiment-analyzer/v1.2.0/src.zip","entrypoint":"handler.predict","runtime":{"pythonVersion":"3.9"},"target":{"platform":"azure-function","resourceGroup":"ml-prod-rg","appName":"sentiment-analyzer-prod"}},"status":{"phase":"Pending","observedGeneration":0}}'
And for a JavaScript model using TensorFlow.js, the spec
would differ slightly:
etcdctl put /models/deployments/image-classifier-js '{"apiVersion":"v1alpha1","kind":"ModelDeployment","metadata":{"name":"image-classifier-js","uid":"b2c3d4e5-f6a7-8901-2345-67890abcdef1"},"spec":{"type":"tfjs","sourceUri":"azure-blob://artifacts/models/image-classifier-js/v0.8.1/src.zip","entrypoint":"dist/bundle.handler","runtime":{"nodeVersion":"18"},"build":{"bundler":"rollup","configFile":"rollup.config.js"},"target":{"platform":"azure-function","resourceGroup":"ml-edge-rg","appName":"image-classifier-js-prod"}},"status":{"phase":"Pending","observedGeneration":0}}'
The status
field is critical. It’s owned by the controller and reports the current state of the reconciliation. The observedGeneration
(a concept borrowed from Kubernetes) helps us differentiate between a spec that the controller has seen and processed versus one that is new or updated.
The Core Controller Reconciliation Loop
We chose Go for implementing the controller due to its excellent concurrency support, static typing, and the quality of the official etcd
and azure-sdk-for-go
libraries. The heart of the controller is a simple loop: connect to etcd, establish a watch on our key prefix, and process events as they arrive.
// main.go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"go.etcd.io/etcd/client/v3"
)
func main() {
endpoints := []string{"localhost:2379"} // In production, this comes from config
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalf("Failed to connect to etcd: %v", err)
}
defer cli.Close()
log.Println("Successfully connected to etcd.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigc
log.Println("Shutdown signal received, cancelling context...")
cancel()
}()
// The controller manages the reconciliation loop.
controller := NewController(cli)
err = controller.Run(ctx)
if err != nil && err != context.Canceled {
log.Fatalf("Controller exited with error: %v", err)
}
log.Println("Controller shut down gracefully.")
}
// controller.go
package main
import (
"context"
"encoding/json"
"log"
"time"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
const deploymentPrefix = "/models/deployments/"
type Controller struct {
etcdClient *clientv3.Client
// In a real application, you'd have an Azure client, blob storage client, etc.
}
func NewController(cli *clientv3.Client) *Controller {
return &Controller{etcdClient: cli}
}
func (c *Controller) Run(ctx context.Context) error {
log.Println("Starting controller loop...")
// Initial load to process existing deployments
resp, err := c.etcdClient.Get(ctx, deploymentPrefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
c.processEvent(ctx, kv)
}
// Start watching for future changes from the revision after our initial GET
watchChan := c.etcdClient.Watch(ctx, deploymentPrefix, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))
for {
select {
case <-ctx.Done():
log.Println("Context cancelled, stopping watch loop.")
return ctx.Err()
case watchResp := <-watchChan:
if err := watchResp.Err(); err != nil {
log.Printf("Watch error: %v. Re-establishing watch...", err)
// Basic retry logic. A production system needs more robust backoff.
time.Sleep(5 * time.Second)
watchChan = c.etcdClient.Watch(ctx, deploymentPrefix, clientv3.WithPrefix())
continue
}
for _, event := range watchResp.Events {
// We only care about PUT events (creations/updates).
// Deletions would trigger a separate teardown logic path.
if event.Type == mvccpb.PUT {
c.processEvent(ctx, event.Kv)
}
}
}
}
}
func (c *Controller) processEvent(ctx context.Context, kv *mvccpb.KeyValue) {
var deployment ModelDeployment
if err := json.Unmarshal(kv.Value, &deployment); err != nil {
log.Printf("Failed to unmarshal deployment spec for key %s: %v", string(kv.Key), err)
return
}
// Simple check to avoid reprocessing the same generation
if deployment.Metadata.Generation == deployment.Status.ObservedGeneration && deployment.Status.Phase == "Succeeded" {
return
}
log.Printf("Processing deployment: %s (Type: %s)", deployment.Metadata.Name, deployment.Spec.Type)
// Delegate to a specific reconciler based on the model type
var err error
switch deployment.Spec.Type {
case "keras":
err = c.reconcileKerasDeployment(ctx, &deployment)
case "tfjs":
err = c.reconcileTfjsDeployment(ctx, &deployment)
default:
log.Printf("Unknown deployment type: %s", deployment.Spec.Type)
// Update status to Failed
deployment.Status.Phase = "Failed"
deployment.Status.Message = "Unknown deployment type"
c.updateStatus(ctx, &deployment)
return
}
if err != nil {
log.Printf("Reconciliation failed for %s: %v", deployment.Metadata.Name, err)
deployment.Status.Phase = "Failed"
deployment.Status.Message = err.Error()
} else {
log.Printf("Reconciliation succeeded for %s", deployment.Metadata.Name)
deployment.Status.Phase = "Succeeded"
deployment.Status.Message = "Deployment is active and healthy."
}
// This is critical: we record that we have processed this version of the spec.
deployment.Status.ObservedGeneration = deployment.Metadata.Generation
c.updateStatus(ctx, &deployment)
}
// A placeholder for the actual reconciler functions
func (c *Controller) reconcileKerasDeployment(ctx context.Context, d *ModelDeployment) error {
log.Printf("Reconciling Keras model %s...", d.Metadata.Name)
// 1. Download source from d.Spec.SourceUri
// 2. Package for Azure Function (Python)
// 3. Deploy/Update Azure Function App
time.Sleep(2 * time.Second) // Simulate work
return nil
}
func (c *Controller) reconcileTfjsDeployment(ctx context.Context, d *ModelDeployment) error {
log.Printf("Reconciling TF.js model %s...", d.Metadata.Name)
// 1. Download source from d.Spec.SourceUri
// 2. Run Rollup build in a container
// 3. Package for Azure Function (Node.js)
// 4. Deploy/Update Azure Function App
time.Sleep(3 * time.Second) // Simulate work
return nil
}
// ... updateStatus function will be shown next ...
// types.go (for clarity)
type ModelDeployment struct {
ApiVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Metadata Metadata `json:"metadata"`
Spec Spec `json:"spec"`
Status Status `json:"status"`
}
type Metadata struct {
Name string `json:"name"`
UID string `json:"uid"`
Generation int64 `json:"generation"` // This should be incremented by the client on each spec update
}
// ... other structs for Spec and Status
A major pitfall we encountered early on was managing state updates. If multiple controller instances were running for high availability, they could race to update the status of a deployment in etcd, leading to stale writes. The solution is to use etcd’s transactional capabilities for optimistic locking. The updateStatus
function must read the current object, check its version, and then perform a Compare-and-Swap (CAS) operation.
// controller.go (continued)
func (c *Controller) updateStatus(ctx context.Context, d *ModelDeployment) {
key := deploymentPrefix + d.Metadata.Name
// Get the current version of the object to perform a CAS
getResp, err := c.etcdClient.Get(ctx, key)
if err != nil {
log.Printf("Failed to get current deployment for status update on %s: %v", d.Metadata.Name, err)
return
}
if len(getResp.Kvs) == 0 {
log.Printf("Cannot update status, deployment %s not found.", d.Metadata.Name)
return
}
currentKv := getResp.Kvs[0]
var currentDeployment ModelDeployment
if err := json.Unmarshal(currentKv.Value, ¤tDeployment); err != nil {
log.Printf("Failed to unmarshal current deployment for status update on %s: %v", d.Metadata.Name, err)
return
}
// Update the status on the object we just fetched
currentDeployment.Status = d.Status
newVal, err := json.Marshal(currentDeployment)
if err != nil {
log.Printf("Failed to marshal updated deployment for %s: %v", d.Metadata.Name, err)
return
}
// Use a transaction to ensure we are updating the version we just read
txnResp, err := c.etcdClient.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", currentKv.ModRevision)).
Then(clientv3.OpPut(key, string(newVal))).
Commit()
if err != nil {
log.Printf("etcd transaction failed for %s: %v", d.Metadata.Name, err)
} else if !txnResp.Succeeded {
log.Printf("Optimistic lock failed for %s. Another process likely updated it. Will retry on next reconciliation loop.", d.Metadata.Name)
}
}
This transactional update prevents race conditions and ensures the integrity of our status reporting.
The TF.js Reconciler with a Containerized Rollup Build
The truly novel part of this system was handling the tfjs
models. The controller itself is a lean Go binary; it cannot and should not have Node.js, npm, or Rollup installed. The only clean way to handle this is to orchestrate a containerized build. When the reconcileTfjsDeployment
function is called, it performs these steps:
- Downloads the model’s source code from Azure Blob Storage to a temporary local directory.
- Creates a
Dockerfile
on the fly designed for the build. - Uses the Docker SDK to build an image from that
Dockerfile
and run a container. - The container runs
npm install
and thennpx rollup -c
. - The controller copies the resulting build artifact (e.g., a
dist
directory) out of the container. - The temporary directory, container, and image are cleaned up.
Here is the code that orchestrates this, assuming the presence of a Docker daemon.
// tfjs_reconciler.go
package main
import (
"archive/zip"
"bytes"
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/archive"
)
// This would be part of the Controller struct
type TfjsReconciler struct {
dockerClient *client.Client
// ... other clients
}
func (r *TfjsReconciler) Reconcile(ctx context.Context, d *ModelDeployment) error {
log.Printf("[TF.js Reconciler] Starting build for %s", d.Metadata.Name)
// Step 1: Prepare a temporary build directory
buildDir, err := os.MkdirTemp("", "tfjs-build-*")
if err != nil {
return fmt.Errorf("failed to create build dir: %w", err)
}
defer os.RemoveAll(buildDir)
log.Printf("[TF.js Reconciler] Using build directory: %s", buildDir)
// In a real implementation, this would download from d.Spec.SourceUri
if err := createFakeSourceFiles(buildDir, d.Spec.Build.ConfigFile); err != nil {
return fmt.Errorf("failed to create fake source: %w", err)
}
// Step 2: Build the Docker image for the build environment
dockerfileContent := fmt.Sprintf(`
FROM node:%s-slim
WORKDIR /app
COPY . .
RUN npm install
CMD ["npx", "rollup", "-c"]
`, d.Spec.Runtime.NodeVersion)
dockerfilePath := filepath.Join(buildDir, "Dockerfile")
if err := os.WriteFile(dockerfilePath, []byte(dockerfileContent), 0644); err != nil {
return fmt.Errorf("failed to write Dockerfile: %w", err)
}
buildCtx, err := archive.TarWithOptions(buildDir, &archive.TarOptions{})
if err != nil {
return fmt.Errorf("failed to create tar from build context: %w", err)
}
imageName := fmt.Sprintf("builder-%s:latest", d.Metadata.Name)
buildResp, err := r.dockerClient.ImageBuild(ctx, buildCtx, types.ImageBuildOptions{
Tags: []string{imageName},
Dockerfile: "Dockerfile",
Remove: true, // Remove intermediate containers
ForceRemove:true,
})
if err != nil {
return fmt.Errorf("failed to build image: %w", err)
}
defer buildResp.Body.Close()
// In a production system, you must stream and log this output.
io.Copy(io.Discard, buildResp.Body)
log.Printf("[TF.js Reconciler] Successfully built image: %s", imageName)
// Step 3: Run the build container
contResp, err := r.dockerClient.ContainerCreate(ctx, &container.Config{
Image: imageName,
}, nil, nil, nil, "")
if err != nil {
return fmt.Errorf("failed to create container: %w", err)
}
defer r.dockerClient.ContainerRemove(ctx, contResp.ID, types.ContainerRemoveOptions{Force: true})
if err := r.dockerClient.ContainerStart(ctx, contResp.ID, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("failed to start container: %w", err)
}
statusCh, errCh := r.dockerClient.ContainerWait(ctx, contResp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
return fmt.Errorf("container wait error: %w", err)
}
case status := <-statusCh:
if status.StatusCode != 0 {
// Get logs on failure
logReader, logErr := r.dockerClient.ContainerLogs(ctx, contResp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if logErr == nil {
buf := new(bytes.Buffer)
buf.ReadFrom(logReader)
log.Printf("Build container logs for %s:\n%s", d.Metadata.Name, buf.String())
}
return fmt.Errorf("build container exited with non-zero status code: %d", status.StatusCode)
}
}
log.Printf("[TF.js Reconciler] Build container finished successfully.")
// Step 4: Extract artifact
reader, _, err := r.dockerClient.CopyFromContainer(ctx, contResp.ID, "/app/dist")
if err != nil {
return fmt.Errorf("failed to copy artifact from container: %w", err)
}
defer reader.Close()
// The result is a tar stream, which we need to extract.
// Logic to untar the artifact stream into a final location omitted for brevity.
log.Println("[TF.js Reconciler] Artifact extracted.")
// Step 5: Package and deploy to Azure
// This would involve zipping the final artifact and using the Azure SDK.
log.Println("[TF.js Reconciler] Deployment to Azure would happen here.")
return nil
}
// Helper to simulate project files
func createFakeSourceFiles(dir, rollupConfigName string) error {
pkgJson := `{"dependencies": {"rollup": "^3.0.0"}}`
rollupConf := `export default { input: 'index.js', output: { dir: 'dist', format: 'cjs' } };`
indexJs := `console.log("Hello from bundled model code");`
os.WriteFile(filepath.Join(dir, "package.json"), []byte(pkgJson), 0644)
os.WriteFile(filepath.Join(dir, rollupConfigName), []byte(rollupConf), 0644)
os.WriteFile(filepath.Join(dir, "index.js"), []byte(indexJs), 0644)
return nil
}
This approach isolates the build environment completely, ensuring that the controller remains lightweight and that builds are hermetic and reproducible. The cost is the overhead of Docker operations, but for a deployment process, this is an acceptable trade-off for the gain in reliability and maintainability.
The final phase involves taking the built artifact—whether it’s the Python code for a Keras model or the bundled JavaScript for a TF.js model—packaging it into a zip file structured correctly for Azure Functions, and using the azure-sdk-for-go
to upload it and trigger the deployment. The logic is largely boilerplate Azure API calls.
This system, born from operational pain, ultimately unified our model deployment strategy. The key was establishing a declarative source of truth in etcd
and building a robust reconciliation engine that could handle the specific build requirements of our polyglot ML environment. The use of Rollup
was not just for a UI, but as an integral part of our backend MLOps pipeline, orchestrated by a Go controller.
The current implementation, however, is not without its limitations. It lacks a leader election mechanism, meaning it can only be run as a single instance, which is a single point of failure. A production-ready version would leverage etcd’s distributed locking primitives to elect a leader among multiple controller instances. Furthermore, the build orchestration is sequential; a more advanced version could parallelize builds for multiple deployments. The error reporting is also basic; surfacing detailed build logs from the container back into the status
field in etcd would be a necessary improvement for debugging failed deployments.