The proliferation of front-end microservices within our monorepo created a significant tooling bottleneck. Each service maintained its own package.json
, leading to dependency drift and inconsistent builds between local developer environments and the CI pipeline. A developer running npm install
on Monday might get a slightly different version of a PostCSS plugin than the CI server, causing subtle but maddening styling bugs. Build times were also compounding, as each CI job for each microservice had to bootstrap a Node.js environment from scratch. We needed to centralize this logic into a consistent, fast, and scalable internal service.
The initial concept was a “CSS-as-a-Service” component for our internal developer platform. A developer or a CI script would submit raw CSS content and a declarative configuration via a REST API. A backend system would then perform the PostCSS transformation in a controlled environment and return the processed CSS. This abstracts away the need for node_modules
from individual projects and guarantees that all builds use the exact same tooling versions and configurations.
Our choice of Go for the worker implementation was deliberate. For a system processing potentially thousands of small, independent jobs, Go’s concurrency model with goroutines is a natural fit. A single worker instance can handle hundreds of concurrent transformations without the heavy overhead of OS threads. Furthermore, Go’s static binaries simplify deployment immensely; we can package our worker into a minimal container image without a complex runtime.
For the messaging backbone, we chose NATS. While Kafka is powerful, its complexity and operational overhead were overkill for this use case. We needed a simple, high-performance message bus to act as a job queue. NATS Core provides a lightweight publish-subscribe and queueing mechanism that is performant and easy to manage.
Here is the high-level architecture we settled on.
graph TD subgraph "CI/CD Pipeline or Developer" A[Client] end subgraph "CSS Service" B(API Server - Go) C{NATS Message Bus} D1(Worker 1 - Go) D2(Worker 2 - Go) Dn(Worker N - Go) E{Result Store - Redis/Cache} end subgraph "Execution Environment" F[PostCSS CLI] end A -- "1. POST /process (source, config)" --> B B -- "2. Publish Job" --> C C -- "3. Queue Subscription" --> D1 C -- "3. Queue Subscription" --> D2 C -- "3. Queue Subscription" --> Dn D1 -- "4. Execute" --> F F -- "5. Return stdout/stderr" --> D1 D1 -- "6. Publish Result" --> E A -- "7. GET /result/:jobId" --> B B -- "8. Fetch Result" --> E E -- "9. Return Result" --> B B -- "10. Return CSS/Error" --> A
The core of the system is the Go worker. It’s responsible for pulling jobs from the queue, safely executing the external PostCSS command, and reporting the outcome. This separation of the API server from the workers is critical for scalability and resilience. We can scale the number of workers independently of the API layer based on job queue depth.
Job Definition and Communication Protocol
First, we must define the structure of our job payload. This will be serialized to JSON and sent over NATS. A common mistake is to create an anemic data model. In a real-world project, you need to account for metadata, idempotency, and configuration options.
pkg/job/job.go
:
package job
import "time"
// PostCSSPluginConfig defines the configuration for a single PostCSS plugin.
// The key is the plugin name (e.g., 'autoprefixer'), and the value
// is its configuration object. This structure is flexible.
type PostCSSPluginConfig map[string]interface{}
// CompileRequest is the payload sent by a client to request a CSS compilation.
// It is published to the NATS subject for workers to consume.
type CompileRequest struct {
JobID string `json:"job_id"`
SourceCSS string `json:"source_css"` // Base64 encoded CSS content
Plugins PostCSSPluginConfig `json:"plugins"`
SubmittedAt time.Time `json:"submitted_at"`
}
// CompileResult is the payload published by a worker after processing a job.
// It contains the outcome of the compilation.
type CompileResult struct {
JobID string `json:"job_id"`
Status string `json:"status"` // "SUCCESS" or "FAILURE"
OutputCSS string `json:"output_css,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Logs string `json:"logs,omitempty"` // Captured stderr
ProcessedAt time.Time `json:"processed_at"`
WorkerID string `json:"worker_id"`
}
We chose to Base64 encode the SourceCSS
. While this adds a small overhead, it prevents issues with unescaped characters in JSON and makes transport over text-based protocols safer. The JobID
is crucial for tracking and idempotency.
The API Server: Job Ingestion
The API server is a straightforward Go HTTP service. Its sole responsibilities are to validate incoming requests, generate a unique job ID, publish the job to NATS, and provide an endpoint for clients to poll for results. We’ll use the standard net/http
library to keep dependencies minimal.
cmd/api/main.go
:
package main
import (
"encoding/json"
"log"
"net/http"
"time"
"encoding/base64"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/your-org/css-service/pkg/job"
)
const (
natsURL = "nats://localhost:4222"
requestSubject = "css.jobs"
listenAddr = ":8080"
)
type APIServer struct {
nc *nats.Conn
// In a production system, this would be a persistent store like Redis or PostgreSQL.
// For this example, we use a simple in-memory map.
resultStore map[string]*job.CompileResult
}
func main() {
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
server := &APIServer{
nc: nc,
resultStore: make(map[string]*job.CompileResult),
}
// Subscriber to listen for results from workers
// The subject "css.results.*" allows us to listen for all results
// while workers publish to a specific subject like "css.results.worker123"
_, err = nc.Subscribe("css.results.*", server.resultHandler)
if err != nil {
log.Fatalf("Failed to subscribe to result subject: %v", err)
}
mux := http.NewServeMux()
mux.HandleFunc("/process", server.processHandler)
mux.HandleFunc("/result/", server.getResultHandler)
log.Printf("API Server listening on %s", listenAddr)
if err := http.ListenAndServe(listenAddr, mux); err != nil {
log.Fatalf("HTTP server failed: %v", err)
}
}
// processHandler accepts new CSS compilation jobs.
func (s *APIServer) processHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
SourceCSS string `json:"source_css"`
Plugins job.PostCSSPluginConfig `json:"plugins"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
// Basic validation
if req.SourceCSS == "" {
http.Error(w, "source_css cannot be empty", http.StatusBadRequest)
return
}
jobID := uuid.New().String()
compileReq := job.CompileRequest{
JobID: jobID,
SourceCSS: req.SourceCSS, // Assuming client sends base64
Plugins: req.Plugins,
SubmittedAt: time.Now().UTC(),
}
payload, err := json.Marshal(compileReq)
if err != nil {
http.Error(w, "Failed to serialize job", http.StatusInternalServerError)
return
}
// Publish to the NATS queue subject. The "css.jobs" subject will be
// listened to by a queue group of workers, ensuring only one worker gets the message.
if err := s.nc.Publish(requestSubject, payload); err != nil {
log.Printf("Error publishing job %s to NATS: %v", jobID, err)
http.Error(w, "Failed to submit job", http.StatusInternalServerError)
return
}
log.Printf("Submitted job %s", jobID)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"job_id": jobID})
}
// getResultHandler allows clients to poll for job results.
func (s *APIServer) getResultHandler(w http.ResponseWriter, r *http.Request) {
jobID := r.URL.Path[len("/result/"):]
result, found := s.resultStore[jobID]
if !found {
// Not found could mean it's still processing or the ID is invalid.
// Returning 202 Accepted indicates the server is aware but result not ready.
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"status": "PROCESSING"})
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
// resultHandler is the callback for NATS messages on the results subject.
func (s *APIServer) resultHandler(msg *nats.Msg) {
var result job.CompileResult
if err := json.Unmarshal(msg.Data, &result); err != nil {
log.Printf("Error unmarshalling result: %v", err)
return
}
log.Printf("Received result for job %s from worker %s", result.JobID, result.WorkerID)
s.resultStore[result.JobID] = &result
}
This API server is simple but demonstrates the core flow. In production, the in-memory resultStore
is a critical single point of failure and would be replaced by Redis, which is well-suited for this kind of temporary state storage.
The Go Worker: The Core Engine
This is where the main logic resides. The worker needs to connect to NATS, subscribe to the job queue, and for each job, manage the entire lifecycle of executing an external command. The pitfall here is mishandling os/exec
. It’s easy to leak resources, miss errors, or fail to capture output correctly.
cmd/worker/main.go
:
package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"sync"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/your-org/css-service/pkg/job"
)
const (
natsURL = "nats://localhost:4222"
requestSubject = "css.jobs"
resultSubject = "css.results"
queueGroup = "css-worker-group"
workerID = "worker-" // Prefix for a unique ID
maxConcurrency = 4 // Max concurrent jobs this worker instance will process
jobTimeout = 30 * time.Second
)
type Worker struct {
nc *nats.Conn
id string
jobQueue chan *nats.Msg
}
func main() {
nc, err := nats.Connect(natsURL, nats.Timeout(10*time.Second))
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
worker := &Worker{
nc: nc,
id: workerID + uuid.New().String(),
jobQueue: make(chan *nats.Msg, maxConcurrency),
}
log.Printf("Worker %s starting...", worker.id)
// Subscribe to the queue group. This ensures messages from 'css.jobs'
// are distributed among all workers in the 'css-worker-group'.
_, err = nc.QueueSubscribe(requestSubject, queueGroup, func(msg *nats.Msg) {
worker.jobQueue <- msg
})
if err != nil {
log.Fatalf("Failed to subscribe to queue: %v", err)
}
log.Printf("Subscribed to subject '%s' with queue group '%s'", requestSubject, queueGroup)
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func(processorID int) {
defer wg.Done()
log.Printf("Processor #%d started", processorID)
for msg := range worker.jobQueue {
worker.processJob(msg, processorID)
}
}(i)
}
wg.Wait()
runtime.Goexit() // Keep main goroutine alive until all processors are done
}
func (w *Worker) processJob(msg *nats.Msg, processorID int) {
var req job.CompileRequest
if err := json.Unmarshal(msg.Data, &req); err != nil {
log.Printf("[Processor %d] Error unmarshalling job: %v", processorID, err)
// We can't process this, so we don't send a result. The job will eventually timeout on the client side.
// In a more robust system, we'd send it to a dead-letter queue.
return
}
log.Printf("[Processor %d] Received job %s", processorID, req.JobID)
// Create a context with timeout for the external command. This is crucial
// to prevent runaway processes from hogging resources.
ctx, cancel := context.WithTimeout(context.Background(), jobTimeout)
defer cancel()
result, err := w.runPostCSS(ctx, &req)
if err != nil {
result = &job.CompileResult{
JobID: req.JobID,
Status: "FAILURE",
ErrorMessage: err.Error(),
ProcessedAt: time.Now().UTC(),
WorkerID: w.id,
}
}
result.WorkerID = w.id
result.ProcessedAt = time.Now().UTC()
payload, err := json.Marshal(result)
if err != nil {
log.Printf("[Processor %d] Job %s: Failed to serialize result: %v", processorID, req.JobID, err)
return
}
// Publish result to a subject specific to this worker, which the API server can listen on.
if err := w.nc.Publish(fmt.Sprintf("%s.%s", resultSubject, w.id), payload); err != nil {
log.Printf("[Processor %d] Job %s: Failed to publish result: %v", processorID, req.JobID, err)
}
log.Printf("[Processor %d] Finished processing job %s with status %s", processorID, req.JobID, result.Status)
}
// runPostCSS handles the core logic of file system interaction and command execution.
func (w *Worker) runPostCSS(ctx context.Context, req *job.CompileRequest) (*job.CompileResult, error) {
// 1. Create a temporary, isolated directory for this job.
tempDir, err := ioutil.TempDir("", "postcss-job-*")
if err != nil {
return nil, fmt.Errorf("failed to create temp dir: %w", err)
}
defer os.RemoveAll(tempDir) // Crucial cleanup step.
// 2. Decode source CSS and write to input file.
sourceBytes, err := base64.StdEncoding.DecodeString(req.SourceCSS)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 source css: %w", err)
}
inputFile := filepath.Join(tempDir, "input.css")
if err := ioutil.WriteFile(inputFile, sourceBytes, 0644); err != nil {
return nil, fmt.Errorf("failed to write input css: %w", err)
}
// 3. Generate PostCSS configuration file from job plugins.
postcssConfig := map[string]interface{}{
"plugins": req.Plugins,
}
configBytes, err := json.Marshal(postcssConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal postcss config: %w", err)
}
configFile := filepath.Join(tempDir, "postcss.config.json")
if err := ioutil.WriteFile(configFile, configBytes, 0644); err != nil {
return nil, fmt.Errorf("failed to write postcss config: %w", err)
}
outputFile := filepath.Join(tempDir, "output.css")
// 4. Construct and execute the command.
// Using 'npx' assumes postcss and plugins are available in the PATH.
// This is typically handled by setting up the execution environment (e.g., a Docker container).
cmd := exec.CommandContext(ctx, "npx", "postcss", inputFile, "-o", outputFile, "--config", configFile)
cmd.Dir = tempDir // Run command from the temp directory.
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
// exec.ExitError contains more details if the process failed.
if exitErr, ok := err.(*exec.ExitError); ok {
return &job.CompileResult{
JobID: req.JobID,
Status: "FAILURE",
ErrorMessage: fmt.Sprintf("PostCSS process exited with code %d", exitErr.ExitCode()),
Logs: stderr.String(),
}, nil
}
return nil, fmt.Errorf("command execution failed: %w. Stderr: %s", err, stderr.String())
}
// 5. Read the output file and return the result.
outputBytes, err := ioutil.ReadFile(outputFile)
if err != nil {
return nil, fmt.Errorf("failed to read output css: %w. Stderr: %s", err, stderr.String())
}
return &job.CompileResult{
JobID: req.JobID,
Status: "SUCCESS",
OutputCSS: string(outputBytes),
Logs: stderr.String(),
}, nil
}
This worker implementation addresses several production concerns:
- Concurrency Limiting: The buffered channel
jobQueue
acts as a semaphore, naturally limiting the number of concurrently executing jobs tomaxConcurrency
. This prevents a single worker instance from being overwhelmed. - Resource Isolation: Each job is executed in its own temporary directory, preventing file system collisions. The
defer os.RemoveAll
is critical for cleanup. - Timeout Handling:
context.WithTimeout
ensures that a stuck PostCSS process (e.g., due to a buggy plugin) will be killed, preventing it from consuming resources indefinitely. - Detailed Error Reporting: We distinguish between worker errors (like file system issues) and PostCSS process errors. The
stderr
from the PostCSS process is captured and returned, which is invaluable for debugging configuration or syntax errors in the CSS.
Packaging and Deployment
A common mistake is to overlook the runtime environment. The Go worker binary has no dependencies, but the os/exec
call to npx postcss
absolutely does. It requires Node.js, npm/npx, and the necessary PostCSS packages to be installed. The most reliable way to manage this is with a Docker container.
Dockerfile
:
# Stage 1: Build the Go application
FROM golang:1.19-alpine AS builder
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY . .
# Build the worker binary
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o css-worker ./cmd/worker
# Stage 2: Create the final production image
FROM node:16-alpine
# Install PostCSS and common plugins globally.
# In a real system, you would pin versions and might have a private npm registry.
RUN npm install -g postcss-cli autoprefixer tailwindcss
# Copy the Go worker binary from the builder stage
COPY /app/css-worker /usr/local/bin/css-worker
# Set the entrypoint
CMD ["css-worker"]
This multi-stage Dockerfile creates a lean image containing our Go binary and the pre-warmed Node.js environment. This image can now be deployed and scaled easily in an environment like Kubernetes.
System Test
To verify the end-to-end flow, we can use a docker-compose.yml
to spin up the entire stack.
docker-compose.yml
:
version: '3.8'
services:
nats:
image: nats:2.9-alpine
ports:
- "4222:4222"
api:
build:
context: .
dockerfile: Dockerfile.api # A separate Dockerfile for the API server
ports:
- "8080:8080"
depends_on:
- nats
environment:
- NATS_URL=nats://nats:4222
worker:
build:
context: .
dockerfile: Dockerfile # The worker Dockerfile from above
deploy:
replicas: 3 # Scale to 3 worker instances
depends_on:
- nats
environment:
- NATS_URL=nats://nats:4222
With the system running, we can submit a job with curl
:
# 1. Submit a job to process TailwindCSS directives
export CSS_CONTENT=$(cat <<'EOF'
@tailwind base;
@tailwind components;
@tailwind utilities;
.my-button {
@apply bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded;
}
EOF
)
# Base64 encode it for the JSON payload
export B64_CSS_CONTENT=$(echo -n "$CSS_CONTENT" | base64)
JOB_ID=$(curl -s -X POST http://localhost:8080/process \
-H "Content-Type: application/json" \
-d '{
"source_css": "'$B64_CSS_CONTENT'",
"plugins": {
"tailwindcss": {},
"autoprefixer": {}
}
}' | jq -r .job_id)
echo "Job submitted with ID: $JOB_ID"
# 2. Poll for the result
sleep 2
curl -s http://localhost:8080/result/$JOB_ID | jq
The output will be the fully processed CSS, proving that the API, NATS queue, and Go worker are all communicating and executing correctly.
Limitations and Future Iterations
This implementation provides a solid foundation but is not without its limitations in a large-scale production setting. The security model of os/exec
is a primary concern; a malicious postcss.config.json
could potentially execute arbitrary commands. A more secure architecture would involve running each job inside a short-lived, sandboxed container (e.g., using Firecracker or gVisor) for true isolation, though this adds significant complexity and latency.
The result storage is ephemeral. For guaranteed delivery and auditing, job states and results should be persisted in a durable database like PostgreSQL, with the in-memory map or Redis serving as a hot cache.
Finally, there’s no intelligent caching. If 100 CI jobs submit the exact same source CSS and config, we process it 100 times. A content-addressable cache (e.g., hashing the job request payload) could be implemented at the API layer. If a hash exists in the cache (Redis or S3), the cached result is returned immediately, skipping the entire NATS and worker flow. This would drastically reduce redundant computation and improve overall system throughput.