The core challenge was ingesting high-frequency telemetry from a fleet of devices expected to exceed 100,000 concurrent connections. Each device maintains a persistent WebSocket connection, pushing JSON payloads multiple times per second. Our initial proof-of-concept, a stateful fleet of VMs behind a load balancer, hit a wall. It was expensive to keep idle capacity for peak connection loads, and auto-scaling stateful WebSocket servers is notoriously painful. A connection storm during a regional device wake-up could overwhelm the fleet before new instances were ready, leading to cascading failures.
A purely serverless approach was the next logical consideration. Google Cloud Functions are excellent for event-driven, stateless workloads. However, they have a maximum execution duration and are fundamentally unsuited for maintaining long-lived TCP connections like WebSockets. An attempt to use them resulted in connections being dropped unpredictably. The mismatch between the stateful nature of WebSockets and the stateless, ephemeral nature of serverless functions was a dead end.
This led to a hybrid architecture concept. We could split the problem into two distinct phases: connection establishment and persistent data transfer.
- Connection Establishment: This is a short-lived, stateless process. It involves authentication, authorization, and deciding which backend server should handle the persistent connection. This is a perfect use case for a serverless function.
- Persistent Data Transfer: This requires a long-lived, stateful process capable of handling thousands of concurrent TCP connections efficiently. This is a job for a dedicated, orchestrated service, like one running on Google Kubernetes Engine (GKE).
Our refined architecture would use a Google Cloud Function as a “connection broker.” A device would first make a standard HTTPS request to the Cloud Function to authenticate. If successful, the function would consult a lightweight service in our GKE cluster to get the address of a suitable WebSocket pod. It would then issue a temporary redirect (HTTP 307) to the client, pointing it to the chosen pod. The client’s WebSocket library would transparently handle the redirect, establishing the persistent connection with the correct backend.
This design gives us the best of both worlds: the immense, cost-effective scalability of Cloud Functions for handling unpredictable connection storms, and the raw performance of a Go application on GKE for managing the persistent connections. The missing piece was securing this hand-off. We couldn’t just expose the WebSocket pods publicly. This is where Cilium’s eBPF-powered networking and security policies became critical.
The GKE-based WebSocket Termination and Ingestion Service
The heart of the stateful component is a set of Go services running on GKE. This consists of two main parts: a connection-manager
that load balances incoming connection requests, and the websocket-ingestor
pods that terminate the WebSocket connections and batch data to TimescaleDB.
Connection Manager Service
This is a simple gRPC service whose sole job is to provide the Cloud Function with the address of the least-loaded websocket-ingestor
pod. It maintains a list of available pods via the Kubernetes API. In a real-world project, this would be more sophisticated, perhaps tracking actual connection counts, but a round-robin approach is a robust starting point.
Here is the protocol buffer definition for the service.
// proto/manager/v1/manager.proto
syntax = "proto3";
package manager.v1;
option go_package = "github.com/your-org/ingestor/gen/go/manager/v1;managerv1";
service ConnectionManagerService {
// Requests a target ingestor pod for a new WebSocket connection.
rpc GetIngestorEndpoint(GetIngestorEndpointRequest) returns (GetIngestorEndpointResponse);
}
message GetIngestorEndpointRequest {
string device_id = 1; // For logging or future routing logic
}
message GetIngestorEndpointResponse {
// The address of the websocket-ingestor pod to connect to.
// e.g., "ws-ingestor-pod-1.websocket-ingestor-svc.default.svc.cluster.local:8080"
string endpoint_address = 1;
// A short-lived token the client must present to the ingestor pod.
string connection_ticket = 2;
}
The Go implementation is straightforward. For this post-mortem, we’ll omit the Kubernetes client-go boilerplate that watches for pod endpoints and focus on the gRPC handler logic. The key is that it generates a short-lived ticket that the Cloud Function will pass back to the connecting device.
// internal/manager/server.go
package manager
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"sync"
"time"
"github.com/patrickmn/go-cache"
managerv1 "github.com/your-org/ingestor/gen/go/manager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Server struct {
managerv1.UnimplementedConnectionManagerServiceServer
// In-memory cache for short-lived connection tickets.
// In a production system, this could be Redis.
ticketCache *cache.Cache
// List of available websocket-ingestor pod hostnames.
// This would be dynamically populated from the K8s API.
ingestorEndpoints []string
nextEndpoint int
mu sync.Mutex
}
func NewServer() *Server {
return &Server{
ticketCache: cache.New(5*time.Minute, 10*time.Minute),
ingestorEndpoints: []string{
// These would be discovered dynamically. For demonstration, they are static.
"websocket-ingestor-0.websocket-ingestor.default.svc.cluster.local:8080",
"websocket-ingestor-1.websocket-ingestor.default.svc.cluster.local:8080",
"websocket-ingestor-2.websocket-ingestor.default.svc.cluster.local:8080",
},
}
}
func (s *Server) GetIngestorEndpoint(ctx context.Context, req *managerv1.GetIngestorEndpointRequest) (*managerv1.GetIngestorEndpointResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.ingestorEndpoints) == 0 {
return nil, status.Error(codes.Unavailable, "no available ingestor endpoints")
}
// Simple round-robin for endpoint selection.
endpoint := s.ingestorEndpoints[s.nextEndpoint]
s.nextEndpoint = (s.nextEndpoint + 1) % len(s.ingestorEndpoints)
ticket, err := generateSecureTicket()
if err != nil {
return nil, status.Error(codes.Internal, "failed to generate connection ticket")
}
// Store the ticket with the target endpoint as the value.
// The ingestor pod will validate this ticket upon connection.
s.ticketCache.Set(ticket, endpoint, cache.DefaultExpiration)
return &managerv1.GetIngestorEndpointResponse{
EndpointAddress: endpoint,
ConnectionTicket: ticket,
}, nil
}
func generateSecureTicket() (string, error) {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
return "", fmt.Errorf("failed to read random bytes for ticket: %w", err)
}
return hex.EncodeToString(bytes), nil
}
WebSocket Ingestor Service
This is the workhorse. It’s a Go application that accepts WebSocket connections, validates the connection ticket, and then funnels incoming messages into a batching mechanism for efficient writes to TimescaleDB. Using a library like gorilla/websocket
is standard.
The critical part of the implementation is the data ingestion path. Writing every single message to the database would create immense pressure and result in poor performance. A common mistake is to perform a database INSERT
inside the message reading loop. The correct approach is to batch writes. We use a buffered channel to decouple the WebSocket reading goroutine from the database writing goroutine.
// internal/ingestor/server.go
package ingestor
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v5/pgxpool"
)
type TelemetryMessage struct {
DeviceID string `json:"device_id"`
Timestamp time.Time `json:"ts"`
Value float64 `json:"value"`
}
// BatchIngestor handles batching and writing data to TimescaleDB.
type BatchIngestor struct {
pool *pgxpool.Pool
msgChannel chan TelemetryMessage
batchSize int
maxLatency time.Duration
logger *slog.Logger
}
func NewBatchIngestor(pool *pgxpool.Pool, logger *slog.Logger) *BatchIngestor {
bi := &BatchIngestor{
pool: pool,
msgChannel: make(chan TelemetryMessage, 10000), // Large buffer
batchSize: 1000,
maxLatency: 2 * time.Second,
logger: logger,
}
go bi.run() // Start the background writer
return bi
}
func (bi *BatchIngestor) run() {
batch := make([]TelemetryMessage, 0, bi.batchSize)
ticker := time.NewTicker(bi.maxLatency)
defer ticker.Stop()
for {
select {
case msg := <-bi.msgChannel:
batch = append(batch, msg)
if len(batch) >= bi.batchSize {
bi.flush(batch)
batch = make([]TelemetryMessage, 0, bi.batchSize) // Reset batch
ticker.Reset(bi.maxLatency) // Reset timer
}
case <-ticker.C:
if len(batch) > 0 {
bi.flush(batch)
batch = make([]TelemetryMessage, 0, bi.batchSize) // Reset batch
}
}
}
}
// flush uses PostgreSQL's COPY protocol for high-performance bulk inserts.
func (bi *BatchIngestor) flush(batch []TelemetryMessage) {
if len(batch) == 0 {
return
}
rows := make([][]interface{}, len(batch))
for i, msg := range batch {
rows[i] = []interface{}{msg.Timestamp, msg.DeviceID, msg.Value}
}
copyCount, err := bi.pool.CopyFrom(
context.Background(),
pgx.Identifier{"telemetry"},
[]string{"time", "device_id", "value"},
pgx.CopyFromRows(rows),
)
if err != nil {
bi.logger.Error("failed to copy data to TimescaleDB", "error", err, "batch_size", len(batch))
// In a production system, add retry logic or a dead-letter queue.
return
}
bi.logger.Info("flushed batch to TimescaleDB", "count", copyCount)
}
// WebSocketServer handles the HTTP -> WebSocket upgrade.
type WebSocketServer struct {
upgrader websocket.Upgrader
ingestor *BatchIngestor
logger *slog.Logger
// This would check against the cache populated by the connection-manager.
// For simplicity, we assume a function `isValidTicket`.
isValidTicket func(ticket string) bool
}
func (s *WebSocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ticket := r.URL.Query().Get("ticket")
if ticket == "" || !s.isValidTicket(ticket) {
http.Error(w, "Forbidden: invalid or missing ticket", http.StatusForbidden)
return
}
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
s.logger.Error("failed to upgrade connection", "error", err)
return
}
defer conn.Close()
s.handleConnection(conn)
}
func (s *WebSocketServer) handleConnection(conn *websocket.Conn) {
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
// Handle client disconnection gracefully
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
s.logger.Warn("websocket connection closed unexpectedly", "error", err)
}
break
}
if messageType == websocket.TextMessage {
var msg TelemetryMessage
if err := json.Unmarshal(p, &msg); err != nil {
s.logger.Warn("failed to unmarshal message", "error", err, "payload", string(p))
continue
}
// Send to the batching ingestor. This is a non-blocking operation
// due to the buffered channel, which is key for performance.
s.ingestor.msgChannel <- msg
}
}
}
TimescaleDB Schema
The database schema is simple but effective. We define a standard table and then convert it into a hypertable, which is TimescaleDB’s core abstraction for partitioning data by time.
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL
);
-- Convert the table to a hypertable, partitioned by the 'time' column.
SELECT create_hypertable('telemetry', 'time');
-- Create an index for efficient lookups by device and time.
CREATE INDEX ON telemetry (device_id, time DESC);
The Serverless Handshake Layer
The Google Cloud Function is written in Go for consistency. Its role is to act as a secure gatekeeper.
A critical implementation detail is connecting the Cloud Function to the GKE cluster. A Cloud Function runs on Google-managed infrastructure outside our project’s VPC. The GKE cluster runs inside our VPC. To bridge this gap, a Serverless VPC Access Connector is required. This connector creates a network endpoint inside our VPC that the Cloud Function can route traffic through.
// cmd/broker-function/main.go
package main
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
managerv1 "github.com/your-org/ingestor/gen/go/manager/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var managerClient managerv1.ConnectionManagerServiceClient
func init() {
functions.HTTP("Broker", broker)
managerAddr := os.Getenv("MANAGER_SERVICE_ADDR") // e.g., "connection-manager.default.svc.cluster.local:50051"
if managerAddr == "" {
slog.Error("MANAGER_SERVICE_ADDR environment variable not set")
// In a real function, this would prevent it from starting.
return
}
// The VPC connector handles routing to the private GKE service address.
// We use insecure credentials as the traffic is within our VPC and will be
// secured by Cilium network policies.
conn, err := grpc.Dial(managerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
slog.Error("failed to connect to manager service", "error", err)
return
}
managerClient = managerv1.NewConnectionManagerServiceClient(conn)
}
// broker is the entry point for the Cloud Function.
func broker(w http.ResponseWriter, r *http.Request) {
// 1. Authenticate the request.
// A real-world project would validate a JWT or API key here.
authHeader := r.Header.Get("Authorization")
deviceID, err := validateAuth(authHeader)
if err != nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// 2. Request an ingestor endpoint from the connection manager.
resp, err := managerClient.GetIngestorEndpoint(r.Context(), &managerv1.GetIngestorEndpointRequest{
DeviceId: deviceID,
})
if err != nil {
slog.ErrorContext(r.Context(), "failed to get ingestor endpoint", "error", err)
http.Error(w, "Service temporarily unavailable", http.StatusServiceUnavailable)
return
}
// 3. Construct the redirect URL and issue a temporary redirect.
// The client's WebSocket library should follow this.
redirectURL := fmt.Sprintf("wss://%s/ws?ticket=%s", resp.EndpointAddress, resp.ConnectionTicket)
// The key is to use a 307 Temporary Redirect.
// Many HTTP clients, and critically most WebSocket libraries, will preserve the
// method (in this case, the implicit GET for the upgrade) when following a 307.
w.Header().Set("Location", redirectURL)
w.WriteHeader(http.StatusTemporaryRedirect)
}
func validateAuth(header string) (string, error) {
// Dummy implementation. This should check a database or identity provider.
if header == "Bearer valid-token-for-device-123" {
return "device-123", nil
}
return "", fmt.Errorf("invalid token")
}
Securing the Architecture with Cilium
Without security, any actor could bypass our Cloud Function and connect directly to the websocket-ingestor
pods. This is where Cilium’s CiliumNetworkPolicy
provides the solution. By running Cilium as the CNI in GKE, we can create identity-aware network policies powered by eBPF.
The first step is to label our Kubernetes resources correctly. The connection-manager
and websocket-ingestor
pods need distinct labels.
# A snippet from the connection-manager deployment
spec:
template:
metadata:
labels:
app: connection-manager
# A snippet from the websocket-ingestor statefulset
spec:
template:
metadata:
labels:
app: websocket-ingestor
The key insight is that traffic from the Cloud Function, when routed through the VPC Access Connector, will appear to originate from a specific IP range associated with that connector. We can create a Cilium policy that only allows traffic from this CIDR block to reach our services.
The flow looks like this:
graph TD subgraph Client Device A[HTTPS Auth Request] end subgraph Google Cloud B[Cloud Function: Broker] -- gRPC --> C{VPC Connector} end subgraph GKE Cluster with Cilium C -- CIDR-restricted traffic --> D[Service: connection-manager] D -- Pod IP --> E[Pod: connection-manager] F[Service: websocket-ingestor] G[Pods: websocket-ingestor] F --> G end A --> B B --> A_Redirect[HTTP 307 Redirect w/ wss:// URL] A_Redirect --> H[WebSocket Connection] H --> G style C fill:#f9f,stroke:#333,stroke-width:2px style E fill:#bbf,stroke:#333,stroke-width:2px style G fill:#bbf,stroke:#333,stroke-width:2px
Here’s the CiliumNetworkPolicy
to enforce this.
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
name: "ingestor-access-policy"
namespace: default
spec:
endpointSelector:
matchLabels:
app: websocket-ingestor
ingress:
- fromEndpoints:
# This is the crucial part. It allows public access, as the client device
# initiates the connection from the internet. The application-level ticket
# provides the authentication for this step. A more advanced setup might use
# an Ingress controller, but for direct access, this is required.
- matchLabels:
"k8s:io.kubernetes.pod.namespace": kube-system
"k8s:k8s-app": kube-dns
# The world entity allows traffic from outside the cluster.
fromEntities:
- world
toPorts:
- ports:
- port: "8080"
protocol: TCP
---
apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
name: "manager-access-policy"
namespace: default
spec:
endpointSelector:
matchLabels:
app: connection-manager
ingress:
- fromCIDR:
# This IP range corresponds to the Serverless VPC Access Connector.
# This policy ensures ONLY the Cloud Function can call the manager service.
- "10.8.0.0/28"
toPorts:
- ports:
- port: "50051"
protocol: TCP
A pitfall here is getting the fromCIDR
correct. It must exactly match the IP range of the VPC connector. The websocket-ingestor
policy allows ingress from the world
because the final WebSocket connection originates from the device on the public internet. The security at that layer is handled by the short-lived ticket, which can only be obtained by an authenticated call through the Cloud Function. The connection-manager
is locked down tight; only the function can talk to it.
The final architecture is robust. It scales where it needs to, it’s efficient where it must be, and it’s secured at the network layer. The serverless function handles the “burst” compute of new connections, while the GKE cluster handles the “marathon” of persistent connections and data processing.
The connection-manager
itself is a potential single point of failure in this design, though it is a very simple service with minimal logic. A production-grade evolution would involve running multiple replicas of the manager with a leader election protocol (e.g., using Kubernetes leases) to ensure high availability. Furthermore, the round-robin load distribution is naive; a more intelligent manager could receive periodic load reports (active connections, CPU usage) from the ingestor pods and make more informed routing decisions. The security of the connection ticket could also be enhanced by having the ingestor pod perform a final validation call back to the manager, trading a little connection latency for stronger assurance.