Implementing a High-Availability Intra-Pod Message Bus Using an ISR Model within an OCI Sidecar


The technical pain point emerged from a seemingly simple requirement in a high-throughput logging and auditing subsystem running on Kubernetes. Our main application container generated critical audit events that needed to be processed by a separate, dedicated sidecar container within the same pod. This sidecar was responsible for data enrichment and forwarding to an external, secure archive. The initial approach used a shared emptyDir volume with the application writing event files and the sidecar polling the directory. This failed spectacularly under load due to filesystem contention, lack of transactional guarantees, and complex state management for tracking processed files.

Our next attempt involved a standard message queue. We evaluated deploying a lightweight broker like NATS or even using a dedicated Kafka topic. This introduced a new set of problems. For communication that never leaves the pod boundary (localhost), introducing a network hop to an external broker felt like a severe architectural misstep. It added latency, a network dependency for local processing, and the operational overhead of managing another component. The core issue remained: we needed a durable, ordered, and reliable communication channel between two processes on the same logical host, but with the guarantees of a distributed log, and without the footprint of one.

This led to our initial concept: a “PodBus,” a hyper-lightweight, log-based message queue running as its own OCI-compliant sidecar. The main application would be the sole producer, and one or more other sidecars could be consumers. The entire system would live and die with the pod, leveraging shared process namespace and local networking for performance. But reliability was non-negotiable. If the PodBus sidecar crashed and was restarted by the kubelet, we could not lose any in-flight audit events. This is where we decided to borrow principles from large-scale distributed systems like Kafka and apply them to this micro-environment. Specifically, we would implement a simplified In-Sync Replica (ISR) model to ensure acknowledged writes are durable, combined with a BASE (Basically Available, Soft state, Eventually consistent) model for consumption.

The technology selection was driven by minimalism and control. We chose Go for its excellent concurrency primitives, small binary size, and suitability for network services. For the data on-disk format, a simple append-only log file was sufficient, leveraging a shared emptyDir volume for persistence across sidecar restarts. The core challenge was designing the protocol that provided the ISR guarantee without actual network replication.

Our interpretation of ISR in this single-node, multi-process context is a two-phase acknowledgement protocol. The “replica set” consists of two members: the producer’s in-memory buffer and the PodBus sidecar’s on-disk log. A message is only considered successfully “committed” when the PodBus sidecar explicitly acknowledges to the producer that it has been written and flushed to disk (fsync). This ensures that a PodBus crash after acknowledgement will not result in data loss, as the data is durable on the shared volume.

Let’s start with the core of the PodBus: the durable log storage. A common mistake here is to simply append to a file. In a real-world project, you must consider file locking to prevent corruption if, hypothetically, two instances were to run, and more importantly, you must control when data is flushed from OS page cache to the physical disk.

// pkg/log/store.go
package log

import (
	"bufio"
	"encoding/binary"
	"fmt"
	"os"
	"sync"
)

const (
	lenWidth = 8 // Number of bytes to store the record length
)

// Record represents a single message in our log.
type Record []byte

// Store manages the append-only log file.
type Store struct {
	file *os.File
	mu   sync.Mutex
	buf  *bufio.Writer
	size uint64
}

// NewStore creates a new log store at the given file path.
// It will create the file if it doesn't exist.
func NewStore(path string) (*Store, error) {
	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
	if err != nil {
		return nil, fmt.Errorf("failed to open log file: %w", err)
	}

	fi, err := f.Stat()
	if err != nil {
		return nil, fmt.Errorf("failed to stat log file: %w", err)
	}

	return &Store{
		file: f,
		buf:  bufio.NewWriter(f),
		size: uint64(fi.Size()),
	}, nil
}

// Append writes a record to the log. This is the heart of our producer path.
// It returns the offset of the new record and any error.
// The key operation is the fsync for durability.
func (s *Store) Append(record Record) (offset uint64, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	// Capture the current size as the starting offset for this new record.
	pos := s.size

	// Write the length of the record first (as a fixed-size header).
	if err := binary.Write(s.buf, binary.BigEndian, uint64(len(record))); err != nil {
		return 0, fmt.Errorf("failed to write record length: %w", err)
	}

	// Write the actual record bytes.
	n, err := s.buf.Write(record)
	if err != nil {
		return 0, fmt.Errorf("failed to write record data: %w", err)
	}
	if n != len(record) {
		return 0, fmt.Errorf("short write: wrote %d bytes, expected %d", n, len(record))
	}
	
	// Flush the buffer to the OS page cache.
	if err := s.buf.Flush(); err != nil {
		return 0, fmt.Errorf("failed to flush buffer: %w", err)
	}

	// The crucial step for our ISR guarantee: ensure the data is on disk.
	// In a high-performance system, you might batch fsync calls, but for
	// correctness in this model, we do it per append.
	if err := s.file.Sync(); err != nil {
		return 0, fmt.Errorf("failed to sync file: %w", err)
	}

	// Update the store's size.
	s.size += uint64(lenWidth + n)
	return pos, nil
}

// Read retrieves a record at a specific offset.
func (s *Store) Read(offset uint64) (Record, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	// Ensure we flush any buffered writes before reading.
	if err := s.buf.Flush(); err != nil {
		return nil, fmt.Errorf("failed to flush buffer before read: %w", err)
	}

	// Read the record length.
	sizeBuf := make([]byte, lenWidth)
	if _, err := s.file.ReadAt(sizeBuf, int64(offset)); err != nil {
		return nil, fmt.Errorf("failed to read record length at offset %d: %w", offset, err)
	}

	recordLen := binary.BigEndian.Uint64(sizeBuf)
	recordBuf := make([]byte, recordLen)

	// Read the record data.
	if _, err := s.file.ReadAt(recordBuf, int64(offset+lenWidth)); err != nil {
		return nil, fmt.Errorf("failed to read record data at offset %d: %w", offset, err)
	}

	return recordBuf, nil
}

// Close ensures all data is written to disk and closes the file handle.
func (s *Store) Close() error {
	s.mu.Lock()
	defer s.mu.Unlock()

	if err := s.buf.Flush(); err != nil {
		return err
	}
	return s.file.Close()
}

This Store implementation is simple but robust. It’s thread-safe and guarantees durability via file.Sync(). The pitfall here is performance; fsync is a costly system call. For our use case—audit logs—correctness trumps raw throughput, so this trade-off is acceptable. For other use cases, one might implement a strategy where fsync is called periodically or after a certain number of bytes have been written, at the risk of losing a small window of data on a power failure.

Next, we need the server component that exposes this log over the network. We’ll use gRPC for efficient, typed communication. The service definition is minimal.

// api/v1/podbus.proto
syntax = "proto3";

package podbus.v1;

option go_package = "github.com/your-repo/podbus/api/v1";

message ProduceRequest {
    // A unique identifier for the message to allow for idempotent retries.
    string message_id = 1;
    bytes record = 2;
}

message ProduceResponse {
    uint64 offset = 1;
}

message ConsumeRequest {
    uint64 offset = 1;
}

message ConsumeResponse {
    bytes record = 1;
    uint64 next_offset = 2;
}

service PodBus {
    // Produce sends a record to the log.
    rpc Produce(ProduceRequest) returns (ProduceResponse) {}

    // Consume reads a record from the log.
    rpc Consume(ConsumeRequest) returns (ConsumeResponse) {}
}

The server implementation wires up the gRPC service to our log.Store. This is where the ISR logic lives. The Produce handler is responsible for the “commit” part of the protocol.

// internal/server/server.go
package server

import (
	"context"
	"fmt"
	"net"
	"os"
	"path/filepath"
	"sync"

	"github.com/your-repo/podbus/api/v1"
	"github.com/your-repo/podbus/pkg/log"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

type Config struct {
	ListenAddr string
	DataDir    string
}

type grpcServer struct {
	api.UnimplementedPodBusServer
	*Config
	log *log.Store
	// Tracks message IDs to handle retries and ensure idempotency.
	// In a real system, this should be a persistent cache or a more
	// sophisticated data structure to avoid unbounded memory growth.
	idempotencyStore map[string]uint64
	mu               sync.Mutex
}

func NewGRPCServer(config *Config) (*grpcServer, error) {
	logPath := filepath.Join(config.DataDir, "podbus.log")
	if err := os.MkdirAll(config.DataDir, 0755); err != nil {
		return nil, fmt.Errorf("failed to create data directory: %w", err)
	}

	l, err := log.NewStore(logPath)
	if err != nil {
		return nil, fmt.Errorf("failed to create log store: %w", err)
	}

	return &grpcServer{
		Config:           config,
		log:              l,
		idempotencyStore: make(map[string]uint64),
	}, nil
}

func (s *grpcServer) Start() error {
	lis, err := net.Listen("tcp", s.ListenAddr)
	if err != nil {
		return fmt.Errorf("failed to listen: %w", err)
	}

	server := grpc.NewServer()
	api.RegisterPodBusServer(server, s)

	// A real service would have graceful shutdown handling.
	return server.Serve(lis)
}

// Produce implements the ISR-style acknowledgement.
func (s *grpcServer) Produce(ctx context.Context, req *api.ProduceRequest) (*api.ProduceResponse, error) {
	if req.GetMessageId() == "" {
		return nil, status.Error(codes.InvalidArgument, "message_id is required for idempotency")
	}

	s.mu.Lock()
	// Check if this message has been processed before.
	if offset, ok := s.idempotencyStore[req.MessageId]; ok {
		s.mu.Unlock()
		return &api.ProduceResponse{Offset: offset}, nil
	}
	s.mu.Unlock()

	// The core operation: append to the durable log.
	offset, err := s.log.Append(req.Record)
	if err != nil {
		// Log this error internally.
		return nil, status.Error(codes.Internal, "failed to write to log")
	}

	s.mu.Lock()
	s.idempotencyStore[req.MessageId] = offset
	s.mu.Unlock()

	// The successful response is the ACK that completes the "replication" to the log.
	return &api.ProduceResponse{Offset: offset}, nil
}

func (s *grpcServer) Consume(ctx context.Context, req *api.ConsumeRequest) (*api.ConsumeResponse, error) {
	record, err := s.log.Read(req.GetOffset())
	if err != nil {
		// This could be an EOF or a real error. We must distinguish.
		// For simplicity, we return NotFound for offsets beyond the log.
		return nil, status.Error(codes.NotFound, "offset not found")
	}
	
	// The next offset is the current offset plus the length of the header and the record.
	nextOffset := req.GetOffset() + uint64(log.LenWidth) + uint64(len(record))

	return &api.ConsumeResponse{
		Record:     record,
		NextOffset: nextOffset,
	}, nil
}

The idempotency check is critical. If a producer sends a message, the PodBus sidecar writes it to disk but crashes before sending the gRPC response, the producer will time out and retry. Without the message_id check, the same message would be written twice.

Now, let’s visualize the architecture within a Kubernetes pod.

graph TD
    subgraph Kubernetes Pod
        direction TB
        
        subgraph Main App Container
            A[Application Logic] --> B{Producer Client};
            B -- 1. Produce(msg_id, data) --> C[gRPC over localhost];
        end

        subgraph PodBus Sidecar Container
            D[gRPC Server] -- 2. Receives Request --> E[Idempotency Check];
            E -- 3. Append to Log --> F[log.Store];
            F -- 4. fsync() --> G[(Shared emptyDir Volume)];
            E -- 5. ACK --> D;
            D -- 6. Return Offset --> C;
        end
        
        C -- 7. Receives ACK --> B;
        B -- 8. Deletes from local buffer --> A;

        subgraph Consumer Sidecar Container
            H[Consumer Client] -- 9. Consume(offset) --> I[gRPC over localhost];
            I --> D;
            D -- 10. Read from Log --> F;
            F -- 11. Reads from --> G;
            F -- 12. Returns data --> D;
            D -- 13. Returns Record --> I;
            I -- 14. Delivers to --> J[Auditing Logic];
            J -- 15. Persists new offset --> K[(Shared emptyDir Volume)];
        end
    end

The final piece is packaging this as an OCI container and deploying it. The Dockerfile is straightforward for a Go application.

# Dockerfile
# Stage 1: Build the binary
FROM golang:1.21-alpine AS builder

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY . .

# Build the binary with optimizations.
RUN CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-w -s' -o podbus ./cmd/server

# Stage 2: Create the final minimal image
FROM scratch

WORKDIR /

COPY --from=builder /app/podbus .

# The directory where the log file will be stored. This should be a mount point.
VOLUME /data

# The default port for the gRPC server.
EXPOSE 8080

ENTRYPOINT ["/podbus"]

Using a multi-stage build results in a tiny, secure image based on scratch, which is best practice for infrastructure components.

The Kubernetes Pod definition ties everything together. It defines the containers and, crucially, the shared emptyDir volume that holds the log file and consumer offsets.

# pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: audit-service-pod
spec:
  volumes:
    # This volume is shared by all containers in the pod.
    # Its lifecycle is tied to the pod.
    - name: podbus-data
      emptyDir: {}
  containers:
    - name: main-application
      image: my-app:1.0.0
      env:
        # The app client connects to the sidecar on localhost.
        - name: PODBUS_ADDR
          value: "localhost:8080"
      volumeMounts:
        # The app doesn't need to mount the volume directly, but could
        # for more advanced recovery scenarios.
        - name: podbus-data
          mountPath: /var/run/podbus 
          # just an example path

    - name: podbus-sidecar
      image: my-podbus-impl:0.1.0
      args:
        - "--listen-addr=:8080"
        - "--data-dir=/data"
      ports:
        - containerPort: 8080
          name: grpc
      volumeMounts:
        - name: podbus-data
          mountPath: /data # This must match the server's --data-dir

    - name: audit-consumer-sidecar
      image: my-audit-consumer:1.0.0
      env:
        - name: PODBUS_ADDR
          value: "localhost:8080"
        - name: OFFSET_FILE_PATH # Consumer tracks its own state
          value: "/data/offsets/audit-consumer.off"
      volumeMounts:
        - name: podbus-data
          mountPath: /data

This configuration demonstrates the BASE properties. The system is Basically Available; if the podbus-sidecar is down, the producer can buffer and retry, and the consumer simply waits. The state is Soft because consumer progress (its offset) is managed by the consumer itself and evolves. The overall state becomes Eventually Consistent as the consumer will always catch up on the log once connectivity is restored.

The primary limitation of this design is its deliberate scope. It is not a distributed message queue; it solves the intra-pod communication problem. The single log file will grow indefinitely and lacks segmentation or compaction, which would be necessary for a long-running service with high throughput. A production-ready version would need to implement log rotation. Furthermore, the in-memory idempotency store is a single point of failure and could grow too large; a persistent or LRU-cache-based implementation would be more robust. Finally, this model relies entirely on Kubernetes to restart failed containers; it has no internal leader election or multi-node replication, as that would violate its core design principle of being a simple, pod-local bus.


  TOC