The service dashboard was a sea of red. A core analytics API, responsible for dynamically slicing and dicing a moderately-sized dataset for a real-time front end, was buckling. P99 latencies were in the seconds, and the Kubernetes pod was constantly being OOMKilled and restarted. The culprit was a Python service built with FastAPI and Pandas. On paper, it was the right toolset: fast to develop and expressive. In production, under a load of a few hundred concurrent requests, it was a disaster. The Global Interpreter Lock (GIL) was serializing requests that were CPU-bound during data aggregation, and Pandas’ memory model, with its tendency to create data copies during filtering and grouping operations, was consuming gigabytes of memory for a dataset that was only a few hundred megabytes on disk.
The initial quick fix—throwing more memory and CPU at the pods—was a costly band-aid. The core problem remained: the architecture was not suited for high-concurrency, read-heavy analytical workloads on a shared, in-memory dataset. The decision was made to rewrite the service in Go. The appeal was obvious: true parallelism with goroutines, a strict type system, and direct control over memory layout and allocation. The challenge, however, was significant. Go lacks a direct equivalent to the rich, high-level API of Pandas. We would need to build a specialized, in-memory data aggregation engine from the ground up, designed specifically for our use case.
This is the build log of that effort—moving from a failing Python/Pandas implementation to a high-performance Go service capable of handling thousands of concurrent aggregation queries with predictable, low-latency performance.
Phase 1: Defining the Data Structures
The first step was to model the core data structure. A Pandas DataFrame is essentially a collection of named, typed columns of equal length. Replicating this in Go requires defining structures that can hold this columnar data efficiently. A common mistake here would be to use []interface{}
to handle different data types, but that introduces the overhead of type assertions and boxing, which kills performance. Instead, we opted for a strongly-typed, generic approach.
We defined a Series
interface and concrete implementations for the types we needed: Float64Series
, StringSeries
, and Int64Series
. The DataFrame
itself would hold a map of column names to these Series
interfaces.
package dataframe
import (
"fmt"
"sync"
)
// Series represents a column of data.
type Series interface {
// Len returns the number of elements in the series.
Len() int
// Type returns a string representation of the series type.
Type() string
}
// Float64Series holds float64 data.
type Float64Series struct {
Name string
Data []float64
}
func (s *Float64Series) Len() int { return len(s.Data) }
func (s *Float64Series) Type() string { return "float64" }
// StringSeries holds string data.
type StringSeries struct {
Name string
Data []string
}
func (s *StringSeries) Len() int { return len(s.Data) }
func (s *StringSeries) Type() string { return "string" }
// Int64Series holds int64 data.
type Int64Series struct {
Name string
Data []int64
}
func (s *Int64Series) Len() int { return len(s.Data) }
func (s *Int64Series) Type() string { return "int64" }
// DataFrame is the core structure for holding columnar data.
// It is designed for concurrent reads but requires external locking for writes.
type DataFrame struct {
mu sync.RWMutex
columns map[string]Series
colOrder []string
length int
}
// NewDataFrame creates an empty DataFrame.
func NewDataFrame() *DataFrame {
return &DataFrame{
columns: make(map[string]Series),
colOrder: []string{},
}
}
// AddColumn adds a new series to the DataFrame.
// This operation is not thread-safe and should be done during initialization.
func (df *DataFrame) AddColumn(name string, s Series) error {
if df.length == 0 && len(df.colOrder) == 0 {
df.length = s.Len()
} else if s.Len() != df.length {
return fmt.Errorf("column %s has length %d, expected %d", name, s.Len(), df.length)
}
if _, exists := df.columns[name]; exists {
return fmt.Errorf("column %s already exists", name)
}
df.columns[name] = s
df.colOrder = append(df.colOrder, name)
return nil
}
// MustAddColumn is a helper that panics on error.
func (df *DataFrame) MustAddColumn(name string, s Series) {
if err := df.AddColumn(name, s); err != nil {
panic(err)
}
}
// GetStringSeries retrieves a StringSeries by name.
func (df *DataFrame) GetStringSeries(name string) (*StringSeries, bool) {
s, ok := df.columns[name]
if !ok {
return nil, false
}
ss, ok := s.(*StringSeries)
return ss, ok
}
// GetFloat64Series retrieves a Float64Series by name.
func (df *DataFrame) GetFloat64Series(name string) (*Float64Series, bool) {
s, ok := df.columns[name]
if !ok {
return nil, false
}
fs, ok := s.(*Float64Series)
return fs, ok
}
// Len returns the number of rows in the DataFrame.
func (df *DataFrame) Len() int {
return df.length
}
This design is simple but effective. Using concrete slice types like []float64
ensures data is stored contiguously in memory, which is critical for cache performance during aggregations. The sync.RWMutex
is included directly in the DataFrame
to manage concurrent access; we’ll see its use later in the service layer. In a real-world project, writes (like AddColumn
) are typically done once during service startup when loading data, while reads happen concurrently throughout the service’s lifetime.
Phase 2: Implementing the GroupBy and Aggregation Logic
This is where the real work begins. The core requirement was to replicate Pandas’ df.groupby(['col_a', 'col_b'])['col_c'].sum()
. This operation first identifies unique combinations of values in col_a
and col_b
, partitions the row indices based on those groups, and then applies the sum
function to col_c
for each partition.
A naive implementation might involve creating new smaller DataFrames for each group, but that’s exactly the kind of memory copying we wanted to avoid. The performant approach is to work with indices. The GroupBy
operation should produce a data structure that maps each group key to a slice of row indices belonging to that group.
package dataframe
import (
"fmt"
"strings"
)
// GroupedDataFrame holds the result of a GroupBy operation.
// It doesn't hold data, only indices into the original DataFrame.
type GroupedDataFrame struct {
df *DataFrame
groups map[string][]int // Map from group key to row indices
}
// GroupBy performs a grouping operation on the DataFrame.
func (df *DataFrame) GroupBy(colNames ...string) (*GroupedDataFrame, error) {
df.mu.RLock()
defer df.mu.RUnlock()
keyCols := make([]*StringSeries, len(colNames))
for i, name := range colNames {
s, ok := df.GetStringSeries(name)
if !ok {
return nil, fmt.Errorf("grouping column '%s' not found or not a string series", name)
}
keyCols[i] = s
}
groups := make(map[string][]int)
// A reusable builder to create composite keys without reallocating on every row.
var keyBuilder strings.Builder
for i := 0; i < df.Len(); i++ {
keyBuilder.Reset()
for j, col := range keyCols {
if j > 0 {
keyBuilder.WriteRune('|') // Use a separator for composite keys
}
keyBuilder.WriteString(col.Data[i])
}
key := keyBuilder.String()
groups[key] = append(groups[key], i)
}
return &GroupedDataFrame{df: df, groups: groups}, nil
}
// AggregationFunc is a function that performs an aggregation on a set of indices.
type AggregationFunc func(indices []int) float64
// AggregationResult holds the result of one aggregation.
type AggregationResult struct {
GroupKey string
Value float64
}
// Aggregate applies an aggregation function to a specified column for each group.
func (gdf *GroupedDataFrame) Aggregate(aggColName string, aggFunc AggregationFunc) ([]AggregationResult, error) {
gdf.df.mu.RLock()
defer gdf.df.mu.RUnlock()
results := make([]AggregationResult, 0, len(gdf.groups))
for key, indices := range gdf.groups {
value := aggFunc(indices)
results = append(results, AggregationResult{GroupKey: key, Value: value})
}
return results, nil
}
// Sum creates an aggregation function to sum a float64 column.
func Sum(df *DataFrame, colName string) (AggregationFunc, error) {
s, ok := df.GetFloat64Series(colName)
if !ok {
return nil, fmt.Errorf("aggregation column '%s' not found or not a float64 series", colName)
}
// The returned function captures the series `s` in its closure.
// This is a key pattern: prepare the function once, then apply it.
return func(indices []int) float64 {
var total float64
for _, idx := range indices {
total += s.Data[idx]
}
return total
}, nil
}
// Mean creates an aggregation function to calculate the mean of a float64 column.
func Mean(df *DataFrame, colName string) (AggregationFunc, error) {
s, ok := df.GetFloat64Series(colName)
if !ok {
return nil, fmt.Errorf("aggregation column '%s' not found or not a float64 series", colName)
}
return func(indices []int) float64 {
if len(indices) == 0 {
return 0
}
var total float64
for _, idx := range indices {
total += s.Data[idx]
}
return total / float64(len(indices))
}, nil
}
Several critical design choices were made here:
- Zero Data Copying: The
GroupBy
function does not copy any of the underlyingDataFrame
‘s series data. It only creates amap[string][]int
, which is memory-efficient. The keys are the concatenated values from the grouping columns, and the values are slices of integer row indices. -
strings.Builder
for Performance: When creating composite keys for multi-column grouping, naively concatenating strings with+
in a loop can lead to excessive memory allocations. Using astrings.Builder
and resetting it for each row is a standard Go performance pattern that avoids this. - Functional Approach for Aggregations: The
Aggregate
method accepts anAggregationFunc
. This makes the system extensible. We can easily add new aggregation types (likeCount
,StdDev
, etc.) by simply creating new functions that match the signature. TheSum
andMean
functions are factory functions; they capture the column to be aggregated over in a closure, which is a clean and efficient pattern.
Phase 3: The API Service Layer
With the core data engine in place, the next step was to expose it via a web API. We used the standard net/http
library for simplicity and performance. The service needs to hold the DataFrame
in memory, handle incoming requests, parse query parameters to determine the grouping and aggregation logic, and execute the query.
package main
import (
"encoding/csv"
"encoding/json"
"log/slog"
"net/http"
"os"
"strconv"
"strings"
"time"
"your_module/dataframe" // Replace with your module path
)
// AggregationService encapsulates the DataFrame and request handling logic.
type AggregationService struct {
df *dataframe.DataFrame
logger *slog.Logger
}
// NewAggregationService creates and initializes the service, including loading data.
func NewAggregationService(logger *slog.Logger, csvPath string) (*AggregationService, error) {
df, err := loadDataFromCSV(csvPath)
if err != nil {
return nil, fmt.Errorf("failed to load data: %w", err)
}
logger.Info("data loaded successfully", "rows", df.Len())
return &AggregationService{
df: df,
logger: logger,
}, nil
}
// loadDataFromCSV is a helper to load data from a CSV into our DataFrame.
// In a real application, this would be more robust.
func loadDataFromCSV(path string) (*dataframe.DataFrame, error) {
// ... Implementation for loading CSV data into the DataFrame ...
// This part is crucial but standard. It involves reading the CSV,
// parsing rows, and appending data to pre-allocated slices for each column.
// For brevity, the full implementation is omitted but would be included in a real project.
// Let's assume it populates a DataFrame like this for demonstration:
df := dataframe.NewDataFrame()
df.MustAddColumn("region", &dataframe.StringSeries{Data: []string{"NA", "EU", "NA", "APAC", "EU"}})
df.MustAddColumn("product", &dataframe.StringSeries{Data: []string{"A", "A", "B", "B", "A"}})
df.MustAddColumn("sales", &dataframe.Float64Series{Data: []float64{100.5, 250.0, 80.2, 120.0, 300.1}})
return df, nil
}
// handleAggregation is the HTTP handler for our API endpoint.
// Example Request: GET /aggregate?group_by=region,product&agg_col=sales&agg_func=sum
func (s *AggregationService) handleAggregation(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 1. Parse and validate query parameters
queryParams := r.URL.Query()
groupByCols := strings.Split(queryParams.Get("group_by"), ",")
aggCol := queryParams.Get("agg_col")
aggFuncName := queryParams.Get("agg_func")
if len(groupByCols) == 0 || groupByCols[0] == "" || aggCol == "" || aggFuncName == "" {
http.Error(w, "missing required query parameters: group_by, agg_col, agg_func", http.StatusBadRequest)
return
}
// 2. Perform the GroupBy operation
grouped, err := s.df.GroupBy(groupByCols...)
if err != nil {
s.logger.Error("grouping failed", "error", err, "columns", groupByCols)
http.Error(w, fmt.Sprintf("grouping error: %v", err), http.StatusBadRequest)
return
}
// 3. Select the aggregation function
var aggFunc dataframe.AggregationFunc
switch aggFuncName {
case "sum":
aggFunc, err = dataframe.Sum(s.df, aggCol)
case "mean":
aggFunc, err = dataframe.Mean(s.df, aggCol)
default:
http.Error(w, fmt.Sprintf("unsupported aggregation function: %s", aggFuncName), http.StatusBadRequest)
return
}
if err != nil {
s.logger.Error("failed to create aggregation function", "error", err, "column", aggCol)
http.Error(w, fmt.Sprintf("aggregation setup error: %v", err), http.StatusBadRequest)
return
}
// 4. Execute the aggregation
results, err := grouped.Aggregate(aggCol, aggFunc)
if err != nil { // This error is less likely here but good practice
s.logger.Error("aggregation execution failed", "error", err)
http.Error(w, "internal server error during aggregation", http.StatusInternalServerError)
return
}
duration := time.Since(start)
s.logger.Info("aggregation complete",
"duration_µs", duration.Microseconds(),
"groups_found", len(results),
)
// 5. Respond with JSON
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(results); err != nil {
s.logger.Error("failed to encode json response", "error", err)
}
}
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Configuration
csvPath := os.Getenv("DATA_PATH")
if csvPath == "" {
csvPath = "data.csv" // Default path
}
listenAddr := os.Getenv("LISTEN_ADDR")
if listenAddr == "" {
listenAddr = ":8080"
}
service, err := NewAggregationService(logger, csvPath)
if err != nil {
logger.Error("failed to start service", "error", err)
os.Exit(1)
}
mux := http.NewServeMux()
mux.HandleFunc("/aggregate", service.handleAggregation)
server := &http.Server{
Addr: listenAddr,
Handler: mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
logger.Info("starting server", "address", listenAddr)
if err := server.ListenAndServe(); err != nil {
logger.Error("server failed", "error", err)
os.Exit(1)
}
}
The service layer is intentionally straightforward. It uses structured logging (slog
) for observability, pulls configuration from environment variables, and sets up a robust http.Server
. The handler logic is a clear pipeline: parse, group, select aggregator, execute, and respond. The key here is that the DataFrame
is read-only within the handler. The df.mu.RLock()
in the GroupBy
method ensures that multiple goroutines (one for each incoming HTTP request) can safely read from the DataFrame
concurrently without data races.
Phase 4: Verification via Testing and Benchmarking
A pragmatic solution is an unverified liability. Unit tests are essential to ensure the aggregation logic is correct. More importantly, benchmarks are needed to prove that we solved the original performance problem.
// dataframe_test.go
package dataframe
import (
"testing"
"math"
)
// setupTestDF creates a standard DataFrame for testing.
func setupTestDF(b *testing.B) *DataFrame {
b.Helper()
df := NewDataFrame()
df.MustAddColumn("region", &StringSeries{Data: []string{"NA", "EU", "NA", "APAC", "EU", "NA"}})
df.MustAddColumn("product", &StringSeries{Data: []string{"A", "A", "B", "B", "A", "A"}})
df.MustAddColumn("sales", &Float64Series{Data: []float64{100.0, 250.0, 80.0, 120.0, 300.0, 50.0}})
return df
}
func TestSumAggregation(t *testing.T) {
df := setupTestDF(&testing.B{})
grouped, err := df.GroupBy("region")
if err != nil {
t.Fatalf("GroupBy failed: %v", err)
}
sumFunc, err := Sum(df, "sales")
if err != nil {
t.Fatalf("Sum func creation failed: %v", err)
}
results, _ := grouped.Aggregate("sales", sumFunc)
expected := map[string]float64{
"NA": 100.0 + 80.0 + 50.0, // 230.0
"EU": 250.0 + 300.0, // 550.0
"APAC": 120.0,
}
if len(results) != len(expected) {
t.Fatalf("Expected %d groups, got %d", len(expected), len(results))
}
for _, res := range results {
if math.Abs(res.Value - expected[res.GroupKey]) > 1e-9 {
t.Errorf("For group %s, expected sum %f, got %f", res.GroupKey, expected[res.GroupKey], res.Value)
}
}
}
// A benchmark is crucial to validate performance claims.
func BenchmarkGroupByAndSum(b *testing.B) {
// Use a larger, more realistic dataset for benchmarking.
numRows := 1_000_000
regions := []string{"NA", "EU", "APAC", "LATAM", "MEA"}
products := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
regionCol := make([]string, numRows)
productCol := make([]string, numRows)
salesCol := make([]float64, numRows)
for i := 0; i < numRows; i++ {
regionCol[i] = regions[i%len(regions)]
productCol[i] = products[i%len(products)]
salesCol[i] = float64(i)
}
df := NewDataFrame()
df.MustAddColumn("region", &StringSeries{Data: regionCol})
df.MustAddColumn("product", &StringSeries{Data: productCol})
df.MustAddColumn("sales", &Float64Series{Data: salesCol})
sumFunc, _ := Sum(df, "sales")
b.ResetTimer()
for i := 0; i < b.N; i++ {
// The operation we are benchmarking
grouped, _ := df.GroupBy("region", "product")
_, _ = grouped.Aggregate("sales", sumFunc)
}
}
Running the benchmark on a standard developer machine for a dataset of 1 million rows produced results in the low milliseconds per operation. This was a stark contrast to the Python service, where a single aggregation could take hundreds of milliseconds. The Go version was orders of magnitude faster, primarily due to the lack of interpreter overhead, efficient memory layout, and the zero-copy indexing strategy. When placed under load using a tool like k6
, the Go service sustained thousands of requests per second on a single pod with minimal CPU and memory usage, completely solving the initial production crisis.
The final system is not a full-fledged Pandas replacement. It lacks the vast functionality for data cleaning, transformation, and complex indexing that Pandas provides. However, it was never intended to be. It is a purpose-built, highly-optimized engine for a specific, performance-critical task. The pitfall of many rewrites is trying to replicate every feature of the old system. The pragmatic approach is to identify the 20% of functionality that serves 80% of the critical workload and implement that with ruthless efficiency.
This solution’s applicability is bounded. It’s ideal for read-heavy, low-latency analytical queries on datasets that fit comfortably in memory. It would not be suitable for scenarios requiring frequent data updates, as a full data reload is the only write mechanism. Future iterations could explore zero-downtime data reloads by swapping a pointer to a new DataFrame
under a write lock, or integrating with columnar formats like Apache Arrow for even greater memory efficiency and potential interoperability with other data systems.