Implementing a Distributed Go Worker System for On-Demand PostCSS Compilation


The proliferation of front-end microservices within our monorepo created a significant tooling bottleneck. Each service maintained its own package.json, leading to dependency drift and inconsistent builds between local developer environments and the CI pipeline. A developer running npm install on Monday might get a slightly different version of a PostCSS plugin than the CI server, causing subtle but maddening styling bugs. Build times were also compounding, as each CI job for each microservice had to bootstrap a Node.js environment from scratch. We needed to centralize this logic into a consistent, fast, and scalable internal service.

The initial concept was a “CSS-as-a-Service” component for our internal developer platform. A developer or a CI script would submit raw CSS content and a declarative configuration via a REST API. A backend system would then perform the PostCSS transformation in a controlled environment and return the processed CSS. This abstracts away the need for node_modules from individual projects and guarantees that all builds use the exact same tooling versions and configurations.

Our choice of Go for the worker implementation was deliberate. For a system processing potentially thousands of small, independent jobs, Go’s concurrency model with goroutines is a natural fit. A single worker instance can handle hundreds of concurrent transformations without the heavy overhead of OS threads. Furthermore, Go’s static binaries simplify deployment immensely; we can package our worker into a minimal container image without a complex runtime.

For the messaging backbone, we chose NATS. While Kafka is powerful, its complexity and operational overhead were overkill for this use case. We needed a simple, high-performance message bus to act as a job queue. NATS Core provides a lightweight publish-subscribe and queueing mechanism that is performant and easy to manage.

Here is the high-level architecture we settled on.

graph TD
    subgraph "CI/CD Pipeline or Developer"
        A[Client]
    end

    subgraph "CSS Service"
        B(API Server - Go)
        C{NATS Message Bus}
        D1(Worker 1 - Go)
        D2(Worker 2 - Go)
        Dn(Worker N - Go)
        E{Result Store - Redis/Cache}
    end

    subgraph "Execution Environment"
        F[PostCSS CLI]
    end

    A -- "1. POST /process (source, config)" --> B
    B -- "2. Publish Job" --> C
    C -- "3. Queue Subscription" --> D1
    C -- "3. Queue Subscription" --> D2
    C -- "3. Queue Subscription" --> Dn
    D1 -- "4. Execute" --> F
    F -- "5. Return stdout/stderr" --> D1
    D1 -- "6. Publish Result" --> E
    A -- "7. GET /result/:jobId" --> B
    B -- "8. Fetch Result" --> E
    E -- "9. Return Result" --> B
    B -- "10. Return CSS/Error" --> A

The core of the system is the Go worker. It’s responsible for pulling jobs from the queue, safely executing the external PostCSS command, and reporting the outcome. This separation of the API server from the workers is critical for scalability and resilience. We can scale the number of workers independently of the API layer based on job queue depth.

Job Definition and Communication Protocol

First, we must define the structure of our job payload. This will be serialized to JSON and sent over NATS. A common mistake is to create an anemic data model. In a real-world project, you need to account for metadata, idempotency, and configuration options.

pkg/job/job.go:

package job

import "time"

// PostCSSPluginConfig defines the configuration for a single PostCSS plugin.
// The key is the plugin name (e.g., 'autoprefixer'), and the value
// is its configuration object. This structure is flexible.
type PostCSSPluginConfig map[string]interface{}

// CompileRequest is the payload sent by a client to request a CSS compilation.
// It is published to the NATS subject for workers to consume.
type CompileRequest struct {
	JobID       string `json:"job_id"`
	SourceCSS   string `json:"source_css"` // Base64 encoded CSS content
	Plugins     PostCSSPluginConfig `json:"plugins"`
	SubmittedAt time.Time `json:"submitted_at"`
}

// CompileResult is the payload published by a worker after processing a job.
// It contains the outcome of the compilation.
type CompileResult struct {
	JobID        string    `json:"job_id"`
	Status       string    `json:"status"` // "SUCCESS" or "FAILURE"
	OutputCSS    string    `json:"output_css,omitempty"`
	ErrorMessage string    `json:"error_message,omitempty"`
	Logs         string    `json:"logs,omitempty"` // Captured stderr
	ProcessedAt  time.Time `json:"processed_at"`
	WorkerID     string    `json:"worker_id"`
}

We chose to Base64 encode the SourceCSS. While this adds a small overhead, it prevents issues with unescaped characters in JSON and makes transport over text-based protocols safer. The JobID is crucial for tracking and idempotency.

The API Server: Job Ingestion

The API server is a straightforward Go HTTP service. Its sole responsibilities are to validate incoming requests, generate a unique job ID, publish the job to NATS, and provide an endpoint for clients to poll for results. We’ll use the standard net/http library to keep dependencies minimal.

cmd/api/main.go:

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"time"
	"encoding/base64"

	"github.com/google/uuid"
	"github.com/nats-io/nats.go"
	"github.com/your-org/css-service/pkg/job"
)

const (
	natsURL      = "nats://localhost:4222"
	requestSubject = "css.jobs"
	listenAddr   = ":8080"
)

type APIServer struct {
	nc *nats.Conn
	// In a production system, this would be a persistent store like Redis or PostgreSQL.
	// For this example, we use a simple in-memory map.
	resultStore map[string]*job.CompileResult
}

func main() {
	nc, err := nats.Connect(natsURL)
	if err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}
	defer nc.Close()

	server := &APIServer{
		nc:          nc,
		resultStore: make(map[string]*job.CompileResult),
	}
	
	// Subscriber to listen for results from workers
	// The subject "css.results.*" allows us to listen for all results
	// while workers publish to a specific subject like "css.results.worker123"
	_, err = nc.Subscribe("css.results.*", server.resultHandler)
	if err != nil {
		log.Fatalf("Failed to subscribe to result subject: %v", err)
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/process", server.processHandler)
	mux.HandleFunc("/result/", server.getResultHandler)

	log.Printf("API Server listening on %s", listenAddr)
	if err := http.ListenAndServe(listenAddr, mux); err != nil {
		log.Fatalf("HTTP server failed: %v", err)
	}
}

// processHandler accepts new CSS compilation jobs.
func (s *APIServer) processHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}

	var req struct {
		SourceCSS string `json:"source_css"`
		Plugins   job.PostCSSPluginConfig `json:"plugins"`
	}

	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
		return
	}

	// Basic validation
	if req.SourceCSS == "" {
		http.Error(w, "source_css cannot be empty", http.StatusBadRequest)
		return
	}

	jobID := uuid.New().String()
	compileReq := job.CompileRequest{
		JobID:       jobID,
		SourceCSS:   req.SourceCSS, // Assuming client sends base64
		Plugins:     req.Plugins,
		SubmittedAt: time.Now().UTC(),
	}

	payload, err := json.Marshal(compileReq)
	if err != nil {
		http.Error(w, "Failed to serialize job", http.StatusInternalServerError)
		return
	}
	
	// Publish to the NATS queue subject. The "css.jobs" subject will be
	// listened to by a queue group of workers, ensuring only one worker gets the message.
	if err := s.nc.Publish(requestSubject, payload); err != nil {
		log.Printf("Error publishing job %s to NATS: %v", jobID, err)
		http.Error(w, "Failed to submit job", http.StatusInternalServerError)
		return
	}
	
	log.Printf("Submitted job %s", jobID)

	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(http.StatusAccepted)
	json.NewEncoder(w).Encode(map[string]string{"job_id": jobID})
}

// getResultHandler allows clients to poll for job results.
func (s *APIServer) getResultHandler(w http.ResponseWriter, r *http.Request) {
	jobID := r.URL.Path[len("/result/"):]
	result, found := s.resultStore[jobID]
	if !found {
		// Not found could mean it's still processing or the ID is invalid.
		// Returning 202 Accepted indicates the server is aware but result not ready.
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusAccepted)
		json.NewEncoder(w).Encode(map[string]string{"status": "PROCESSING"})
		return
	}

	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(result)
}

// resultHandler is the callback for NATS messages on the results subject.
func (s *APIServer) resultHandler(msg *nats.Msg) {
	var result job.CompileResult
	if err := json.Unmarshal(msg.Data, &result); err != nil {
		log.Printf("Error unmarshalling result: %v", err)
		return
	}
	log.Printf("Received result for job %s from worker %s", result.JobID, result.WorkerID)
	s.resultStore[result.JobID] = &result
}

This API server is simple but demonstrates the core flow. In production, the in-memory resultStore is a critical single point of failure and would be replaced by Redis, which is well-suited for this kind of temporary state storage.

The Go Worker: The Core Engine

This is where the main logic resides. The worker needs to connect to NATS, subscribe to the job queue, and for each job, manage the entire lifecycle of executing an external command. The pitfall here is mishandling os/exec. It’s easy to leak resources, miss errors, or fail to capture output correctly.

cmd/worker/main.go:

package main

import (
	"bytes"
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"os/exec"
	"path/filepath"
	"runtime"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/nats-io/nats.go"
	"github.com/your-org/css-service/pkg/job"
)

const (
	natsURL        = "nats://localhost:4222"
	requestSubject = "css.jobs"
	resultSubject  = "css.results"
	queueGroup     = "css-worker-group"
	workerID       = "worker-" // Prefix for a unique ID
	maxConcurrency = 4         // Max concurrent jobs this worker instance will process
	jobTimeout     = 30 * time.Second
)

type Worker struct {
	nc       *nats.Conn
	id       string
	jobQueue chan *nats.Msg
}

func main() {
	nc, err := nats.Connect(natsURL, nats.Timeout(10*time.Second))
	if err != nil {
		log.Fatalf("Failed to connect to NATS: %v", err)
	}
	defer nc.Close()

	worker := &Worker{
		nc:       nc,
		id:       workerID + uuid.New().String(),
		jobQueue: make(chan *nats.Msg, maxConcurrency),
	}

	log.Printf("Worker %s starting...", worker.id)

	// Subscribe to the queue group. This ensures messages from 'css.jobs'
	// are distributed among all workers in the 'css-worker-group'.
	_, err = nc.QueueSubscribe(requestSubject, queueGroup, func(msg *nats.Msg) {
		worker.jobQueue <- msg
	})
	if err != nil {
		log.Fatalf("Failed to subscribe to queue: %v", err)
	}

	log.Printf("Subscribed to subject '%s' with queue group '%s'", requestSubject, queueGroup)

	var wg sync.WaitGroup
	for i := 0; i < maxConcurrency; i++ {
		wg.Add(1)
		go func(processorID int) {
			defer wg.Done()
			log.Printf("Processor #%d started", processorID)
			for msg := range worker.jobQueue {
				worker.processJob(msg, processorID)
			}
		}(i)
	}

	wg.Wait()
	runtime.Goexit() // Keep main goroutine alive until all processors are done
}

func (w *Worker) processJob(msg *nats.Msg, processorID int) {
	var req job.CompileRequest
	if err := json.Unmarshal(msg.Data, &req); err != nil {
		log.Printf("[Processor %d] Error unmarshalling job: %v", processorID, err)
		// We can't process this, so we don't send a result. The job will eventually timeout on the client side.
		// In a more robust system, we'd send it to a dead-letter queue.
		return
	}
	
	log.Printf("[Processor %d] Received job %s", processorID, req.JobID)

	// Create a context with timeout for the external command. This is crucial
	// to prevent runaway processes from hogging resources.
	ctx, cancel := context.WithTimeout(context.Background(), jobTimeout)
	defer cancel()

	result, err := w.runPostCSS(ctx, &req)
	if err != nil {
		result = &job.CompileResult{
			JobID:        req.JobID,
			Status:       "FAILURE",
			ErrorMessage: err.Error(),
			ProcessedAt:  time.Now().UTC(),
			WorkerID:     w.id,
		}
	}
	
	result.WorkerID = w.id
	result.ProcessedAt = time.Now().UTC()

	payload, err := json.Marshal(result)
	if err != nil {
		log.Printf("[Processor %d] Job %s: Failed to serialize result: %v", processorID, req.JobID, err)
		return
	}

	// Publish result to a subject specific to this worker, which the API server can listen on.
	if err := w.nc.Publish(fmt.Sprintf("%s.%s", resultSubject, w.id), payload); err != nil {
		log.Printf("[Processor %d] Job %s: Failed to publish result: %v", processorID, req.JobID, err)
	}
	
	log.Printf("[Processor %d] Finished processing job %s with status %s", processorID, req.JobID, result.Status)
}

// runPostCSS handles the core logic of file system interaction and command execution.
func (w *Worker) runPostCSS(ctx context.Context, req *job.CompileRequest) (*job.CompileResult, error) {
	// 1. Create a temporary, isolated directory for this job.
	tempDir, err := ioutil.TempDir("", "postcss-job-*")
	if err != nil {
		return nil, fmt.Errorf("failed to create temp dir: %w", err)
	}
	defer os.RemoveAll(tempDir) // Crucial cleanup step.

	// 2. Decode source CSS and write to input file.
	sourceBytes, err := base64.StdEncoding.DecodeString(req.SourceCSS)
	if err != nil {
		return nil, fmt.Errorf("failed to decode base64 source css: %w", err)
	}
	inputFile := filepath.Join(tempDir, "input.css")
	if err := ioutil.WriteFile(inputFile, sourceBytes, 0644); err != nil {
		return nil, fmt.Errorf("failed to write input css: %w", err)
	}

	// 3. Generate PostCSS configuration file from job plugins.
	postcssConfig := map[string]interface{}{
		"plugins": req.Plugins,
	}
	configBytes, err := json.Marshal(postcssConfig)
	if err != nil {
		return nil, fmt.Errorf("failed to marshal postcss config: %w", err)
	}
	configFile := filepath.Join(tempDir, "postcss.config.json")
	if err := ioutil.WriteFile(configFile, configBytes, 0644); err != nil {
		return nil, fmt.Errorf("failed to write postcss config: %w", err)
	}
	
	outputFile := filepath.Join(tempDir, "output.css")

	// 4. Construct and execute the command.
	// Using 'npx' assumes postcss and plugins are available in the PATH.
	// This is typically handled by setting up the execution environment (e.g., a Docker container).
	cmd := exec.CommandContext(ctx, "npx", "postcss", inputFile, "-o", outputFile, "--config", configFile)
	cmd.Dir = tempDir // Run command from the temp directory.

	var stdout, stderr bytes.Buffer
	cmd.Stdout = &stdout
	cmd.Stderr = &stderr

	if err := cmd.Run(); err != nil {
		// exec.ExitError contains more details if the process failed.
		if exitErr, ok := err.(*exec.ExitError); ok {
			return &job.CompileResult{
				JobID:        req.JobID,
				Status:       "FAILURE",
				ErrorMessage: fmt.Sprintf("PostCSS process exited with code %d", exitErr.ExitCode()),
				Logs:         stderr.String(),
			}, nil
		}
		return nil, fmt.Errorf("command execution failed: %w. Stderr: %s", err, stderr.String())
	}

	// 5. Read the output file and return the result.
	outputBytes, err := ioutil.ReadFile(outputFile)
	if err != nil {
		return nil, fmt.Errorf("failed to read output css: %w. Stderr: %s", err, stderr.String())
	}

	return &job.CompileResult{
		JobID:     req.JobID,
		Status:    "SUCCESS",
		OutputCSS: string(outputBytes),
		Logs:      stderr.String(),
	}, nil
}

This worker implementation addresses several production concerns:

  • Concurrency Limiting: The buffered channel jobQueue acts as a semaphore, naturally limiting the number of concurrently executing jobs to maxConcurrency. This prevents a single worker instance from being overwhelmed.
  • Resource Isolation: Each job is executed in its own temporary directory, preventing file system collisions. The defer os.RemoveAll is critical for cleanup.
  • Timeout Handling: context.WithTimeout ensures that a stuck PostCSS process (e.g., due to a buggy plugin) will be killed, preventing it from consuming resources indefinitely.
  • Detailed Error Reporting: We distinguish between worker errors (like file system issues) and PostCSS process errors. The stderr from the PostCSS process is captured and returned, which is invaluable for debugging configuration or syntax errors in the CSS.

Packaging and Deployment

A common mistake is to overlook the runtime environment. The Go worker binary has no dependencies, but the os/exec call to npx postcss absolutely does. It requires Node.js, npm/npx, and the necessary PostCSS packages to be installed. The most reliable way to manage this is with a Docker container.

Dockerfile:

# Stage 1: Build the Go application
FROM golang:1.19-alpine AS builder

WORKDIR /app

COPY go.mod ./
COPY go.sum ./
RUN go mod download

COPY . .

# Build the worker binary
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o css-worker ./cmd/worker

# Stage 2: Create the final production image
FROM node:16-alpine

# Install PostCSS and common plugins globally.
# In a real system, you would pin versions and might have a private npm registry.
RUN npm install -g postcss-cli autoprefixer tailwindcss

# Copy the Go worker binary from the builder stage
COPY --from=builder /app/css-worker /usr/local/bin/css-worker

# Set the entrypoint
CMD ["css-worker"]

This multi-stage Dockerfile creates a lean image containing our Go binary and the pre-warmed Node.js environment. This image can now be deployed and scaled easily in an environment like Kubernetes.

System Test

To verify the end-to-end flow, we can use a docker-compose.yml to spin up the entire stack.

docker-compose.yml:

version: '3.8'

services:
  nats:
    image: nats:2.9-alpine
    ports:
      - "4222:4222"

  api:
    build:
      context: .
      dockerfile: Dockerfile.api # A separate Dockerfile for the API server
    ports:
      - "8080:8080"
    depends_on:
      - nats
    environment:
      - NATS_URL=nats://nats:4222

  worker:
    build:
      context: .
      dockerfile: Dockerfile # The worker Dockerfile from above
    deploy:
      replicas: 3 # Scale to 3 worker instances
    depends_on:
      - nats
    environment:
      - NATS_URL=nats://nats:4222

With the system running, we can submit a job with curl:

# 1. Submit a job to process TailwindCSS directives
export CSS_CONTENT=$(cat <<'EOF'
@tailwind base;
@tailwind components;
@tailwind utilities;

.my-button {
  @apply bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded;
}
EOF
)

# Base64 encode it for the JSON payload
export B64_CSS_CONTENT=$(echo -n "$CSS_CONTENT" | base64)

JOB_ID=$(curl -s -X POST http://localhost:8080/process \
-H "Content-Type: application/json" \
-d '{
  "source_css": "'$B64_CSS_CONTENT'",
  "plugins": {
    "tailwindcss": {},
    "autoprefixer": {}
  }
}' | jq -r .job_id)

echo "Job submitted with ID: $JOB_ID"

# 2. Poll for the result
sleep 2
curl -s http://localhost:8080/result/$JOB_ID | jq

The output will be the fully processed CSS, proving that the API, NATS queue, and Go worker are all communicating and executing correctly.

Limitations and Future Iterations

This implementation provides a solid foundation but is not without its limitations in a large-scale production setting. The security model of os/exec is a primary concern; a malicious postcss.config.json could potentially execute arbitrary commands. A more secure architecture would involve running each job inside a short-lived, sandboxed container (e.g., using Firecracker or gVisor) for true isolation, though this adds significant complexity and latency.

The result storage is ephemeral. For guaranteed delivery and auditing, job states and results should be persisted in a durable database like PostgreSQL, with the in-memory map or Redis serving as a hot cache.

Finally, there’s no intelligent caching. If 100 CI jobs submit the exact same source CSS and config, we process it 100 times. A content-addressable cache (e.g., hashing the job request payload) could be implemented at the API layer. If a hash exists in the cache (Redis or S3), the cached result is returned immediately, skipping the entire NATS and worker flow. This would drastically reduce redundant computation and improve overall system throughput.


  TOC