Constructing a Real-Time Argo CD Sync Status Stream with a Go-Gin SSE Backend and a Lit Web Components UI


The deployment status page on our internal developer platform was becoming a liability. It operated on a naive five-second polling interval, hammering our backend API and creating a user experience that felt perpetually out of date. Developers, frustrated with the lag and ambiguity, would invariably abandon the UI for kubectl get events -w or the native Argo CD interface. This defeated the entire purpose of creating a unified, streamlined developer portal. The constant polling was not just inefficient; it eroded trust in the platform we were building. The mandate was clear: eliminate polling and deliver immediate, trustworthy, real-time feedback from our GitOps engine directly to the developer’s browser.

Our initial discussions leaned towards WebSockets. It’s the standard answer for bidirectional real-time communication. However, in a real-world project, you must weigh complexity against requirements. We only needed one-way communication: from the server to the client. The browser never needed to send arbitrary data back over the real-time channel. Implementing WebSockets would have introduced unnecessary complexity around connection lifecycle management, proxy configuration (especially with ingress controllers), and library dependencies on both client and server.

This led us to Server-Sent Events (SSE). It’s a simpler, more elegant solution for our specific problem. SSE is built on standard HTTP, making it transparent to most network infrastructure. It provides a persistent, unidirectional connection where the server can push data whenever it becomes available. Crucially, the browser’s native EventSource API handles connection retries automatically, a piece of resilience we would have had to build ourselves with WebSockets. For a system that streams status updates, SSE was the pragmatic choice.

For the technology stack, the decisions were straightforward. Our platform’s backend services are written in Go for its performance and robust concurrency primitives, making Go-Gin a natural fit for the API layer. On the frontend, our portal is built as a collection of micro-frontends using Web Components, with Lit as our library of choice for its simplicity and reactivity. The challenge was to bridge the gap between an event happening deep within our Kubernetes cluster—an Argo CD Application resource changing its state—and a seamless UI update in a Lit component, all without a single poll.

The Core of the Real-Time Layer: A Go-Based SSE Broker

The first step was building a server-side component capable of managing multiple persistent SSE connections. This “broker” needed to handle new client connections, gracefully manage disconnections, and broadcast messages to all connected clients. Go’s concurrency features are perfect for this.

The design is a central Broker struct that uses channels to coordinate operations.

// sse_broker.go
package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

// Broker manages a set of SSE client connections and broadcasts messages to them.
type Broker struct {
	// A channel to receive events that need to be broadcasted to all clients.
	Notifier chan []byte

	// A channel for new clients to register.
	newClients chan chan []byte

	// A channel for clients that are closing their connection.
	closingClients chan chan []byte

	// A map holding all connected clients. The key is the client's message channel.
	clients map[chan []byte]bool

	// Mutex to protect access to the clients map.
	mu sync.Mutex
}

// NewBroker creates and starts a new Broker instance.
func NewBroker() *Broker {
	b := &Broker{
		Notifier:       make(chan []byte, 1),
		newClients:     make(chan chan []byte),
		closingClients: make(chan chan []byte),
		clients:        make(map[chan []byte]bool),
	}

	// Start a goroutine to listen for and handle broker events.
	go b.listen()

	return b
}

// listen is the central event loop for the broker. It handles client
// connections, disconnections, and message broadcasting.
func (b *Broker) listen() {
	for {
		select {
		case s := <-b.newClients:
			// A new client has connected. Add it to the clients map.
			b.mu.Lock()
			b.clients[s] = true
			b.mu.Unlock()
			log.Printf("Client added. %d clients registered.", len(b.clients))

		case s := <-b.closingClients:
			// A client has disconnected. Remove it from the clients map.
			b.mu.Lock()
			delete(b.clients, s)
			b.mu.Unlock()
			log.Printf("Removed client. %d clients registered.", len(b.clients))

		case event := <-b.Notifier:
			// A new event has arrived. Broadcast it to all connected clients.
			// This operation is done concurrently to avoid blocking if a client is slow.
			b.mu.Lock()
			for clientMessageChan := range b.clients {
				// Use a goroutine to prevent a slow client from blocking the broadcast.
				go func(c chan []byte) {
					select {
					case c <- event:
						// Message sent successfully
					case <-time.After(1 * time.Second):
						// Timeout if the client channel is blocked.
						// This could indicate a stuck client.
						log.Println("Warning: client channel send timeout")
					}
				}(clientMessageChan)
			}
			b.mu.Unlock()
		}
	}
}

This broker is robust. It uses a select statement to handle multiple channel operations without blocking. A critical detail here is the use of a sync.Mutex. While channels are the preferred way to communicate between goroutines, a map is not safe for concurrent reads and writes. The mutex ensures that additions and removals from the clients map are atomic. Another production-grade consideration is broadcasting messages inside a new goroutine with a timeout. Without this, a single slow or unresponsive client could block the entire broadcast loop, delaying updates for all other users.

With the broker logic in place, integrating it into a Gin handler is straightforward.

// main.go (partial)
import (
	"github.com/gin-gonic/gin"
	"io"
)

func (b *Broker) ServeHTTP() gin.HandlerFunc {
	return func(c *gin.Context) {
		// Create a new channel for this specific client.
		clientChan := make(chan []byte)

		// Register the new client with the broker.
		b.newClients <- clientChan

		// Listen for the client to close the connection.
		defer func() {
			b.closingClients <- clientChan
		}()
		
		c.Header("Content-Type", "text/event-stream")
		c.Header("Cache-Control", "no-cache")
		c.Header("Connection", "keep-alive")
		c.Header("Access-Control-Allow-Origin", "*") // Adjust for production

		// Use c.Stream to send data. It flushes data to the client immediately.
		c.Stream(func(w io.Writer) bool {
			// Wait for a message from the broker
			msg, ok := <-clientChan
			if !ok {
				// Channel was closed, meaning the client disconnected.
				return false // Stop streaming
			}
			
			// Format as an SSE message. The `data:` prefix is required.
			// The two `\n\n` are also part of the spec to delimit messages.
			fmt.Fprintf(w, "data: %s\n\n", msg)
			return true // Continue streaming
		})
	}
}

func main() {
    // ... setup code ...
	broker := NewBroker()

	router := gin.Default()
	router.GET("/status-stream", broker.ServeHTTP())
    // ... router.Run() ...
}

The Gin handler sets the necessary HTTP headers for an SSE connection. Content-Type: text/event-stream is mandatory. The others prevent caching and keep the connection alive. The c.Stream function is a key Gin feature that allows us to hold the connection open and write data incrementally. When a message is received on the client-specific channel, it’s formatted with the data: prefix and sent to the client.

Interfacing with the Source of Truth

The next challenge was feeding this broker with real data. The naive approach would be for our Go service to poll the Argo CD API. This moves the polling from client-to-server to server-to-Argo, which is a marginal improvement at best and doesn’t solve the core problem of latency.

The correct, cloud-native approach is to watch the Kubernetes API server directly. Argo CD manages its state using a Custom Resource Definition (CRD) for its Application object. Every time Argo CD performs a sync or the health status of an application’s resources changes, it updates the status field of the corresponding Application resource in Kubernetes. We can tap into this event stream.

For this, we use the official Kubernetes client-go library, specifically its informer mechanism. An informer is much more efficient than a raw API watch. It maintains a local cache of objects and provides event handlers (AddFunc, UpdateFunc, DeleteFunc) that are triggered only when an object changes. This is far more resource-friendly than constantly hitting the API server.

First, we need to set up the Kubernetes client and the Argo CD Application informer.

// k8s_watcher.go
package main

import (
	"context"
	"encoding/json"
	"log"
	"time"

	argoprojv1a1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
	argocdclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
	"github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/tools/clientcmd"
)

// AppStatusEvent defines the structure of the JSON message we'll send over SSE.
type AppStatusEvent struct {
	Name      string `json:"name"`
	Namespace string `json:"namespace"`
	SyncStatus  string `json:"syncStatus"`
	HealthStatus string `json:"healthStatus"`
	LastSync  metav1.Time `json:"lastSync"`
	Message   string `json:"message"`
}

// startArgoCDWatcher initializes and runs an informer for Argo CD Application resources.
func startArgoCDWatcher(ctx context.Context, broker *Broker) {
	// Assumes in-cluster configuration. For local dev, use kubeconfig path.
	config, err := clientcmd.BuildConfigFromFlags("", "")
	if err != nil {
		log.Fatalf("Error building kubeconfig: %s", err.Error())
	}

	clientset, err := argocdclient.NewForConfig(config)
	if err != nil {
		log.Fatalf("Error building argocd clientset: %s", err.Error())
	}

	// Create an informer factory. We specify a resync period to ensure the cache
	// doesn't become stale, though event handlers are the primary mechanism.
	informerFactory := externalversions.NewSharedInformerFactory(clientset, time.Minute*10)
	appInformer := informerFactory.Argoproj().V1alpha1().Applications().Informer()

	appInformer.AddEventHandler(
		&ResourceUpdateHandler{
			Broker: broker,
		},
	)
	
	log.Println("Starting Argo CD Application informer...")
	informerFactory.Start(ctx.Done())
	
	// Wait for the initial cache to sync
	informerFactory.WaitForCacheSync(ctx.Done())
	log.Println("Informer cache synced.")
}


// ResourceUpdateHandler implements the cache.ResourceEventHandler interface.
type ResourceUpdateHandler struct {
	Broker *Broker
}

// OnAdd is called when a new Application is created.
func (h *ResourceUpdateHandler) OnAdd(obj interface{}) {
	app := obj.(*argoprojv1a1.Application)
	log.Printf("Application ADDED: %s/%s", app.Namespace, app.Name)
	h.processAppUpdate(app)
}

// OnUpdate is called when an existing Application is modified. This is our primary event source.
func (h *ResourceUpdateHandler) OnUpdate(oldObj, newObj interface{}) {
	app := newObj.(*argoprojv1a1.Application)
	log.Printf("Application UPDATED: %s/%s", app.Namespace, app.Name)
	h.processAppUpdate(app)
}

// OnDelete is called when an Application is deleted.
func (h *ResourceUpdateHandler) OnDelete(obj interface{}) {
    app := obj.(*argoprojv1a1.Application)
	log.Printf("Application DELETED: %s/%s", app.Namespace, app.Name)
	// Optionally, send a "deleted" event.
}

// processAppUpdate extracts relevant status info, marshals it to JSON, and broadcasts it.
func (h *ResourceUpdateHandler) processAppUpdate(app *argoprojv1a1.Application) {
	event := AppStatusEvent{
		Name:      app.Name,
		Namespace: app.Namespace,
		SyncStatus:  string(app.Status.Sync.Status),
		HealthStatus: string(app.Status.Health.Status),
	}

	if app.Status.OperationState != nil {
		event.LastSync = app.Status.OperationState.FinishedAt
		event.Message = app.Status.OperationState.Message
	}

	payload, err := json.Marshal(event)
	if err != nil {
		log.Printf("Error marshalling app status: %v", err)
		return
	}

	// Send the JSON payload to the broker's notifier channel.
	h.Broker.Notifier <- payload
}

// main.go must be updated to start this watcher
func main() {
	broker := NewBroker()
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Start the watcher in a separate goroutine
	go startArgoCDWatcher(ctx, broker)

	router := gin.Default()
	router.GET("/status-stream", broker.ServeHTTP())
	
	log.Println("Starting server on :8080")
	if err := router.Run(":8080"); err != nil {
		log.Fatalf("Failed to run server: %v", err)
	}
}

The ResourceUpdateHandler implements the required methods. We are primarily interested in OnUpdate. Inside processAppUpdate, we construct our AppStatusEvent struct with the key fields from the Application.Status, marshal it to JSON, and push it onto the broker’s Notifier channel. This effectively connects the Kubernetes event stream to our SSE broadcast system. The system is now fully event-driven from end to end.

The Frontend: A Reactive Lit Component

The final piece is the client-side component that consumes this stream. Using Lit, we can create a self-contained web component that handles the EventSource connection and reactively updates its own DOM when new data arrives.

// deployment-status-stream.js
import { LitElement, html, css } from 'lit';
import { property } from 'lit/decorators.js';

class DeploymentStatusStream extends LitElement {
  static styles = css`
    :host {
      display: block;
      font-family: sans-serif;
      padding: 16px;
      border: 1px solid #ccc;
      border-radius: 8px;
      max-width: 600px;
    }
    .status-grid {
      display: grid;
      grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
      gap: 1rem;
    }
    .app-card {
      padding: 1rem;
      border-radius: 4px;
      transition: background-color 0.3s ease;
    }
    .app-card h3 {
      margin-top: 0;
    }
    .status-badge {
      display: inline-block;
      padding: 0.25em 0.6em;
      font-size: 75%;
      font-weight: 700;
      line-height: 1;
      text-align: center;
      white-space: nowrap;
      vertical-align: baseline;
      border-radius: 0.25rem;
      color: #fff;
    }
    .Synced { background-color: #28a745; }
    .Progressing { background-color: #007bff; }
    .OutOfSync { background-color: #ffc107; color: #000; }
    .Degraded { background-color: #dc3545; }
    .Missing { background-color: #6c757d; }
    .Healthy { background-color: #28a745; }
    .Suspended { background-color: #17a2b8; }
  `;

  // Use Lit's @property decorator to create reactive properties.
  // When 'apps' is updated, the component will re-render automatically.
  @property({ type: Object })
  apps = {};

  eventSource = null;

  // connectedCallback is a standard Web Component lifecycle method.
  // It's called when the component is added to the DOM.
  connectedCallback() {
    super.connectedCallback();
    this.connect();
  }

  // disconnectedCallback is called when the component is removed from the DOM.
  disconnectedCallback() {
    super.disconnectedCallback();
    this.disconnect();
  }

  connect() {
    if (this.eventSource) {
      this.disconnect();
    }
    // The URL should point to your Go backend service.
    this.eventSource = new EventSource('http://localhost:8080/status-stream');

    this.eventSource.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        const appKey = `${data.namespace}/${data.name}`;
        
        // This assignment triggers Lit's reactivity.
        // We create a new object to ensure Lit detects the change.
        this.apps = {
          ...this.apps,
          [appKey]: data,
        };
      } catch (error) {
        console.error('Failed to parse SSE event data:', error);
      }
    };

    this.eventSource.onerror = (err) => {
      console.error('EventSource failed:', err);
      // The browser will automatically attempt to reconnect.
    };
  }

  disconnect() {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }

  render() {
    const appEntries = Object.values(this.apps);

    if (appEntries.length === 0) {
      return html`<p>Waiting for deployment status updates...</p>`;
    }

    return html`
      <h2>Live Argo CD Application Status</h2>
      <div class="status-grid">
        ${appEntries.map(app => this.renderAppCard(app))}
      </div>
    `;
  }

  renderAppCard(app) {
    const appKey = `${app.namespace}/${app.name}`;
    const healthClass = app.healthStatus || 'Unknown';
    const syncClass = app.syncStatus || 'Unknown';

    return html`
      <div class="app-card ${healthClass}">
        <h3>${appKey}</h3>
        <p>
          Sync Status: <span class="status-badge ${syncClass}">${app.syncStatus}</span>
        </p>
        <p>
          Health Status: <span class="status-badge ${healthClass}">${app.healthStatus}</span>
        </p>
        <p>
          Last Sync: ${app.lastSync ? new Date(app.lastSync).toLocaleString() : 'N/A'}
        </p>
        ${app.message ? html`<p>Message: <code>${app.message}</code></p>` : ''}
      </div>
    `;
  }
}

customElements.define('deployment-status-stream', DeploymentStatusStream);

The component’s logic is clean and declarative. In connectedCallback, it establishes the EventSource connection. The onmessage handler parses the incoming JSON and updates the apps property. Because apps is a reactive property, Lit efficiently re-renders only the parts of the DOM that have changed when a new status update arrives for a specific application. The disconnectedCallback ensures we clean up the connection when the component is removed from the page, preventing memory leaks. This component encapsulates all the logic for real-time updates, making it a simple drop-in for our developer portal.

Visualizing the End-to-End Flow

The complete, event-driven architecture looks like this:

sequenceDiagram
    participant Dev as Developer
    participant Git
    participant ArgoCD as Argo CD Controller
    participant K8s as Kubernetes API Server
    participant GoSSE as Go-Gin SSE Service
    participant LitUI as Lit Component (Browser)

    Dev->>Git: git push
    Git-->>ArgoCD: Webhook/Poll notifies of change
    ArgoCD->>K8s: Updates Application CRD Status (e.g., to 'Progressing')
    K8s-->>GoSSE: Informer receives CRD update event
    GoSSE->>GoSSE: processAppUpdate() marshals status to JSON
    GoSSE-->>LitUI: Broadcasts SSE message over persistent HTTP connection
    LitUI->>LitUI: onmessage handler updates reactive property
    Note over LitUI: UI re-renders instantly

    ArgoCD->>K8s: Deploys resources, sync completes...
    ArgoCD->>K8s: Updates Application CRD Status to 'Synced' / 'Healthy'
    K8s-->>GoSSE: Informer receives second CRD update event
    GoSSE-->>LitUI: Broadcasts new SSE message
    LitUI->>LitUI: UI updates again to show final state

This implementation successfully replaced our inefficient polling mechanism. The user experience is now what it always should have been: instant, fluid, and accurate. The load on our backend is significantly reduced, as it no longer handles a barrage of polling requests. The Go service is lightweight and scales horizontally, while the SSE connections are far less resource-intensive than WebSockets would have been.

However, this solution is not without its limitations. The current implementation broadcasts every application update to every connected client. In a large organization with hundreds of applications and developers, this could lead to unnecessary data being pushed to browsers. A future iteration could implement a subscription model where the client specifies which applications it’s interested in via query parameters (e.g., /status-stream?app=my-app), and the Go broker would manage topic-based broadcasts. Furthermore, SSE is limited by the browser’s maximum number of concurrent HTTP connections per domain (typically 6). For a dashboard showing many different types of real-time data, this could become a bottleneck, potentially forcing a re-evaluation of a single, multiplexed WebSocket connection. For our current use case, though, SSE remains the superior and more pragmatic engineering choice.


  TOC