Our data ingestion pipeline was becoming a bottleneck, not in terms of throughput, but in operational agility. The core service, written in Go, ingests schemaless JSON blobs from hundreds of clients. Each client’s data requires unique transformation logic before being routed to one of two destinations: time-series metrics to InfluxDB, and structured event logs to Elasticsearch. The problem was that this transformation logic was hardcoded. Every new client or change in a data format necessitated a code change, a new build, and a full redeployment cycle for the entire ingestion fleet. This process was slow, risky, and completely untenable as we scaled.
The initial concept was to externalize this transformation logic. We needed a plugin system. A simple approach using Go’s native plugin
package was dismissed early. Its reliance on shared object files (.so
) makes it platform-dependent and notoriously brittle, especially with slight variations in Go versions or dependency trees between the host and the plugin. A microservice-based approach, where the main service would make an RPC call to a dedicated transformation service, was also considered. This would work but introduces network latency and the operational overhead of deploying and managing a separate fleet of services for logic that is fundamentally stateless.
This led to the selection of WebAssembly (WASM). A WASM runtime provides a secure, sandboxed environment to execute code at near-native speed. Crucially, WASM modules are platform-agnostic binaries. We could compile our transformation logic, written in Go (using TinyGo for smaller binaries), into a .wasm
file. The main Go ingestion service could then load and execute these modules on-the-fly, without a restart. For the runtime, wazero
was chosen due to its zero-dependency nature and excellent performance characteristics. This architecture promised to decouple the stable, high-performance I/O layer from the volatile, client-specific business logic.
The core of this architecture is the contract, or Application Binary Interface (ABI), between the Go host and the WASM guest module. WASM itself only understands numeric types. Exchanging complex data structures like JSON strings requires careful management of the WASM module’s linear memory.
sequenceDiagram participant Host (Go) participant Wazero Runtime participant Guest (WASM) Host->>Wazero Runtime: Load 'processor.wasm' and instantiate Wazero Runtime-->>Host: Module instance ready Note over Host,Guest: Guest memory is isolated Host->>Host: 1. Allocate buffer in Host memory (with input data) Host->>Wazero Runtime: 2. Allocate buffer in Guest memory Wazero Runtime-->>Host: Pointer to Guest buffer Host->>Wazero Runtime: 3. Write input data into Guest buffer Host->>Wazero Runtime: 4. Call exported Guest function 'process_data(ptr, len)' Wazero Runtime->>Guest: Execute 'process_data' Guest->>Guest: Parse data from its memory Note over Guest: Decides to route to InfluxDB Guest->>Guest: 5. Prepare InfluxDB line protocol string Guest->>Guest: 6. Allocate new buffer in its own memory Guest->>Guest: 7. Write line protocol to new buffer Guest->>Wazero Runtime: 8. Call imported Host function 'host_send_to_influx(ptr, len)' Wazero Runtime->>Host: Execute 'host_send_to_influx' Host->>Host: 9. Read data from Guest memory at given ptr/len Host->>InfluxDB: Send data via InfluxDB client Host-->>Wazero Runtime: Return status code (0 for success) Wazero Runtime-->>Guest: Return status code Guest-->>Wazero Runtime: 'process_data' completes Wazero Runtime-->>Host: 'process_data' returns
This sequence highlights the critical steps: memory allocation and data copying. The host is the orchestrator, controlling the lifecycle of the WASM module and providing the I/O capabilities (like sending data to Elasticsearch) that the sandboxed module cannot perform itself.
The Go Host Implementation
The host application is responsible for setting up the wazero
runtime, defining the functions that will be exposed to the WASM guest, and managing the clients for InfluxDB and Elasticsearch.
Here’s the core structure of the host application. It initializes the runtime, links the host functions, and then loads and runs the WASM module.
cmd/host/main.go
:
package main
import (
"context"
"log"
"os"
"sync"
"time"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
)
// In a real-world project, these would come from config files or env vars.
const (
wasmModulePath = "build/processor.wasm"
influxDBURL = "http://localhost:8086"
influxDBToken = "my-super-secret-token"
influxDBOrg = "my-org"
influxDBBucket = "my-bucket"
elasticsearchAddr = "http://localhost:9200"
)
// hostState holds the resources the WASM module will interact with.
// We pass a pointer to this struct to our host function closures.
type hostState struct {
sync.Mutex
influxClient *InfluxClient
elasticClient *ElasticsearchClient
currentModule api.Module
memory api.Memory
lastInputData []byte
lastOutputData []byte
}
func main() {
ctx := context.Background()
// Initialize external service clients.
influxClient, err := NewInfluxClient(influxDBURL, influxDBToken, influxDBOrg, influxDBBucket)
if err != nil {
log.Panicf("failed to create influxdb client: %v", err)
}
defer influxClient.Close()
esClient, err := NewElasticsearchClient([]string{elasticsearchAddr})
if err != nil {
log.Panicf("failed to create elasticsearch client: %v", err)
}
state := &hostState{
influxClient: influxClient,
elasticClient: esClient,
}
// Create a new wazero runtime.
r := wazero.NewRuntime(ctx)
defer r.Close(ctx)
// Wazero requires this for TinyGo builds that use WASI.
wasi_snapshot_preview1.MustInstantiate(ctx, r)
// Compile the WASM module. This is a one-time cost per module.
// In a long-running service, you'd cache this compiled module.
compiledModule, err := r.CompileModule(ctx, MustReadWasmFile(wasmModulePath))
if err != nil {
log.Panicf("failed to compile wasm module: %v", err)
}
defer compiledModule.Close(ctx)
// Create a module builder to define our host functions.
hostModuleBuilder := r.NewHostModuleBuilder("env")
// Define the functions exported by the host to the WASM guest.
// These functions use closures to access the hostState.
hostModuleBuilder.NewFunctionBuilder().
WithFunc(func(ctx context.Context, module api.Module, ptr, size uint32) {
// A simple logging function for debugging from WASM.
data, ok := module.Memory().Read(ptr, size)
if !ok {
log.Printf("host_log: failed to read memory at ptr=%d, size=%d", ptr, size)
return
}
log.Printf("[WASM] %s", string(data))
}).Export("host_log")
hostModuleBuilder.NewFunctionBuilder().
WithFunc(func(ctx context.Context, module api.Module, ptr, size uint32) int32 {
// Reads from guest memory and sends data to InfluxDB.
state.Lock()
defer state.Unlock()
lineProtocol, ok := module.Memory().Read(ptr, size)
if !ok {
log.Printf("host_send_to_influx: memory read failed")
return -1
}
if err := state.influxClient.Write(ctx, string(lineProtocol)); err != nil {
log.Printf("host_send_to_influx: failed to write: %v", err)
return -1
}
return 0 // Success
}).Export("host_send_to_influx")
hostModuleBuilder.NewFunctionBuilder().
WithFunc(func(ctx context.Context, module api.Module, indexNamePtr, indexNameSize, docPtr, docSize uint32) int32 {
// Reads from guest memory and sends data to Elasticsearch.
state.Lock()
defer state.Unlock()
indexNameBytes, ok := module.Memory().Read(indexNamePtr, indexNameSize)
if !ok {
log.Printf("host_send_to_es: index name read failed")
return -1
}
docBytes, ok := module.Memory().Read(docPtr, docSize)
if !ok {
log.Printf("host_send_to_es: document read failed")
return -1
}
if err := state.elasticClient.Index(ctx, string(indexNameBytes), docBytes); err != nil {
log.Printf("host_send_to_es: failed to index document: %v", err)
return -1
}
return 0 // Success
}).Export("host_send_to_es")
// Instantiate the host module. This makes the functions available for linking.
_, err = hostModuleBuilder.Instantiate(ctx)
if err != nil {
log.Panicf("failed to instantiate host module: %v", err)
}
// Now instantiate the guest module, linking it with our host functions.
mod, err := r.InstantiateModule(ctx, compiledModule, wazero.NewModuleConfig().
WithStdout(os.Stdout).
WithStderr(os.Stderr))
if err != nil {
log.Panicf("failed to instantiate guest module: %v", err)
}
state.currentModule = mod
state.memory = mod.Memory()
// Get references to the functions exported by the WASM module.
processData := mod.ExportedFunction("process_data")
allocate := mod.ExportedFunction("allocate")
deallocate := mod.ExportedFunction("deallocate")
// --- Simulate receiving data and processing it ---
// Example 1: A metric for InfluxDB
inputJSON := `{"type": "metric", "source": "sensor-A1", "value": 42.5, "unit": "celsius", "timestamp": 1678886400}`
log.Printf("\n--- Processing metric data ---\nInput: %s", inputJSON)
process(ctx, state, processData, allocate, deallocate, []byte(inputJSON))
// Example 2: A log event for Elasticsearch
inputJSON = `{"type": "log", "level": "error", "service": "auth-service", "message": "user authentication failed", "user_id": "user-123"}`
log.Printf("\n--- Processing log data ---\nInput: %s", inputJSON)
process(ctx, state, processData, allocate, deallocate, []byte(inputJSON))
// Example 3: Unrecognized data
inputJSON = `{"type": "unknown", "payload": "..."}`
log.Printf("\n--- Processing unknown data ---\nInput: %s", inputJSON)
process(ctx, state, processData, allocate, deallocate, []byte(inputJSON))
}
// process is the core orchestration logic for a single data processing request.
func process(ctx context.Context, state *hostState, processData, allocate, deallocate api.Function, data []byte) {
state.Lock()
defer state.Unlock()
// 1. Allocate memory in the WASM guest for the input data.
results, err := allocate.Call(ctx, uint64(len(data)))
if err != nil {
log.Panicf("allocate failed: %v", err)
}
ptr := uint32(results[0])
defer func() {
// 5. Deallocate the memory after we're done.
_, err := deallocate.Call(ctx, uint64(ptr), uint64(len(data)))
if err != nil {
log.Panicf("deallocate failed: %v", err)
}
}()
// 2. Write the input data to the allocated memory.
ok := state.memory.Write(ptr, data)
if !ok {
log.Panicf("memory write failed")
}
// 3. Call the WASM function to process the data.
_, err = processData.Call(ctx, uint64(ptr), uint64(len(data)))
if err != nil {
log.Panicf("process_data failed: %v", err)
}
log.Println("Processing finished successfully.")
}
func MustReadWasmFile(path string) []byte {
bytes, err := os.ReadFile(path)
if err != nil {
log.Panicf("failed to read wasm file %s: %v", path, err)
}
return bytes
}
The service clients for InfluxDB and Elasticsearch are encapsulated for clarity and robustness. A real-world implementation would use more sophisticated configuration, connection pooling, and retry logic.
cmd/host/clients.go
:
package main
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
// --- InfluxDB Client ---
type InfluxClient struct {
client influxdb2.Client
writeAPI api.WriteAPIBlocking
}
func NewInfluxClient(url, token, org, bucket string) (*InfluxClient, error) {
client := influxdb2.NewClient(url, token)
// Simple health check on startup
_, err := client.Health(context.Background())
if err != nil {
return nil, fmt.Errorf("influxdb health check failed: %w", err)
}
writeAPI := client.WriteAPIBlocking(org, bucket)
return &InfluxClient{client: client, writeAPI: writeAPI}, nil
}
func (c *InfluxClient) Write(ctx context.Context, lineProtocol string) error {
log.Printf("HOST -> InfluxDB: Writing line protocol: %s", lineProtocol)
return c.writeAPI.WriteRecord(ctx, lineProtocol)
}
func (c *InfluxClient) Close() {
c.client.Close()
}
// --- Elasticsearch Client ---
type ElasticsearchClient struct {
client *elasticsearch.Client
}
func NewElasticsearchClient(addresses []string) (*ElasticsearchClient, error) {
cfg := elasticsearch.Config{
Addresses: addresses,
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("error creating the elasticsearch client: %w", err)
}
// Simple health check
res, err := client.Info()
if err != nil {
return nil, fmt.Errorf("error getting elasticsearch response: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("elasticsearch client info returned an error")
}
return &ElasticsearchClient{client: client}, nil
}
func (c *ElasticsearchClient) Index(ctx context.Context, indexName string, document []byte) error {
log.Printf("HOST -> Elasticsearch: Indexing document in '%s'", indexName)
req := esapi.IndexRequest{
Index: indexName,
Body: strings.NewReader(string(document)),
Refresh: "true", // For immediate visibility in this example
}
res, err := req.Do(ctx, c.client)
if err != nil {
return fmt.Errorf("error getting response: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("error indexing document: %s", res.String())
}
return nil
}
The WASM Guest Plugin in Go/TinyGo
The guest module contains the actual business logic. It must be compiled with TinyGo to produce a compatible and small WASM file. It needs to export functions the host can call (allocate
, deallocate
, process_data
) and import functions the host provides (host_log
, host_send_to_influx
, host_send_to_es
).
The key challenge here is the manual memory management. We export allocate
and deallocate
functions so the host can manage memory within our module’s linear memory space.
cmd/guest/main.go
:
package main
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"unsafe"
)
// main is required for the 'wasi' target, but we don't use it.
func main() {}
// --- Host Imports ---
// These functions are provided by the Go host application.
// The go:wasmimport directive links this Go function to the imported WASM function.
//go:wasmimport env host_log
func hostLog(ptr, size uint32)
//go:wasmimport env host_send_to_influx
func hostSendToInflux(ptr, size uint32) int32
//go:wasmimport env host_send_to_es
func hostSendToES(indexNamePtr, indexNameSize, docPtr, docSize uint32) int32
// --- Guest Exports ---
// These functions are exported to and callable by the Go host application.
//export allocate
func allocate(size uint32) *byte {
// Allocate a buffer of the given size and return a pointer to it.
buf := make([]byte, size)
return &buf[0]
}
//export deallocate
func deallocate(ptr *byte, size uint32) {
// TinyGo's garbage collector will handle this. This is a no-op
// but is included to demonstrate the concept of a complete memory management ABI.
// In languages like Rust or C, you would free the memory here.
}
//export process_data
func processData(ptr, size uint32) {
// Read the input data from memory.
inputBytes := readBytesFromMemory(ptr, size)
var data map[string]interface{}
if err := json.Unmarshal(inputBytes, &data); err != nil {
log(fmt.Sprintf("error unmarshaling json: %v", err))
return
}
dataType, ok := data["type"].(string)
if !ok {
log("error: 'type' field is missing or not a string")
return
}
switch dataType {
case "metric":
processMetric(data)
case "log":
processLog(data)
default:
log(fmt.Sprintf("unrecognized data type: %s", dataType))
}
}
// --- Internal Logic ---
func processMetric(data map[string]interface{}) {
source, _ := data["source"].(string)
value, _ := data["value"].(float64)
unit, _ := data["unit"].(string)
// A common pitfall in real-world projects is assuming fields exist.
// Production-grade code requires robust validation.
if source == "" || unit == "" {
log("metric data is missing 'source' or 'unit' fields")
return
}
// Transform the data into InfluxDB line protocol format.
// measurement,tagKey=tagValue fieldKey=fieldValue timestamp
// Example: "env_metrics,source=sensor-A1,unit=celsius value=42.5"
var sb strings.Builder
sb.WriteString("env_metrics")
sb.WriteString(fmt.Sprintf(",source=%s,unit=%s", source, unit))
sb.WriteString(fmt.Sprintf(" value=%s", strconv.FormatFloat(value, 'f', -1, 64)))
lineProtocol := sb.String()
// Write the line protocol string to memory and call the host function.
ptr, size := writeStringToMemory(lineProtocol)
if ret := hostSendToInflux(ptr, size); ret != 0 {
log(fmt.Sprintf("host_send_to_influx failed with code %d", ret))
}
}
func processLog(data map[string]interface{}) {
service, _ := data["service"].(string)
if service == "" {
log("log data is missing 'service' field")
return
}
// For logs, we can often just forward the original JSON document.
// We'll use the service name to create a daily index name.
indexName := fmt.Sprintf("%s-logs-daily", service)
docBytes, err := json.Marshal(data)
if err != nil {
log(fmt.Sprintf("failed to re-marshal log data: %v", err))
return
}
indexPtr, indexSize := writeStringToMemory(indexName)
docPtr, docSize := writeBytesToMemory(docBytes)
if ret := hostSendToES(indexPtr, indexSize, docPtr, docSize); ret != 0 {
log(fmt.Sprintf("host_send_to_es failed with code %d", ret))
}
}
// --- Memory Utilities ---
func log(message string) {
ptr, size := writeStringToMemory(message)
hostLog(ptr, size)
}
func readBytesFromMemory(ptr, size uint32) []byte {
return unsafe.Slice((*byte)(unsafe.Pointer(uintptr(ptr))), size)
}
func writeStringToMemory(s string) (uint32, uint32) {
buf := []byte(s)
bufPtr := &buf[0]
unsafePtr := unsafe.Pointer(bufPtr)
return uint32(uintptr(unsafePtr)), uint32(len(buf))
}
func writeBytesToMemory(b []byte) (uint32, uint32) {
bufPtr := &b[0]
unsafePtr := unsafe.Pointer(bufPtr)
return uint32(uintptr(unsafePtr)), uint32(len(b))
}
To compile this guest plugin, use the following TinyGo command:tinygo build -o build/processor.wasm -target wasi cmd/guest/main.go
This command produces a processor.wasm
file in the build
directory, which the host application loads at runtime. The architecture is now complete. We can modify cmd/guest/main.go
, recompile it, and replace the .wasm
file. The running host application, if designed to watch for file changes, could hot-reload this new module and instantly adopt the new processing logic without any downtime.
The primary limitation of this implementation is the performance overhead associated with data transfer across the WASM boundary. For every request, data is copied from the host’s memory into the guest’s linear memory. The guest then processes it and potentially copies result data back for the host to read. While wazero
is highly optimized, this copying is not free. In scenarios requiring sub-millisecond latencies for extremely high volumes of small messages, this overhead could become a bottleneck. Furthermore, the manual pointer management in the ABI is complex and error-prone. A more mature system would likely use a higher-level abstraction layer or code generation to create safer and more ergonomic bindings between the host and guest. Finally, the current plugins are stateless. Supporting stateful operations would require the host to manage and persist state on behalf of WASM module instances, adding significant complexity to the host’s design.