Implementing a Fault-Tolerant Go CDC Pipeline for Meilisearch Using a Monorepo and Consul for State Management


The search index was perpetually out of sync. Our team’s initial approach for keeping our Meilisearch instance updated was a nightly batch job that re-indexed critical tables from a primary PostgreSQL database. This worked, but “nightly” is unacceptable for a system that needs to reflect user changes within seconds. The intermittent load spikes on the database during these re-indexing runs were also becoming a production stability concern. The clear technical pain point was the lack of a real-time, low-impact data synchronization mechanism between our source of truth (Postgres) and our search system (Meilisearch).

The conceptual pivot was to stop thinking in batches and start thinking in streams. Instead of pulling snapshots of data, we needed to react to individual changes as they happened. This led directly to Change Data Capture (CDC). The plan was to build a dedicated Go service that would tap into PostgreSQL’s logical replication stream, process data change events (INSERT, UPDATE, DELETE), and push them to Meilisearch in near real-time.

This immediately raised a critical architectural question: how do we make this pipeline resilient? If the CDC service crashes, how does it know where to resume streaming without losing data or re-processing everything? Storing the last processed Log Sequence Number (LSN) in a local file is fragile. Storing it back in the primary database creates a messy circular dependency. The solution required a distributed, highly-available state store, completely decoupled from the service itself. This is where Consul entered the picture. It would act as the durable memory for our CDC pipeline’s progress.

Our entire backend is written in Go and structured as a monorepo. This new CDC service, which we’ll call cdc-indexer, would naturally live there, sharing data models and database configurations with the main application service, simplifying maintenance and deployment.

The final architecture is a straightforward, robust data flow:

graph TD
    A[PostgreSQL] -- Logical Replication (WAL) --> B(Go CDC Indexer Service);
    B -- Read/Write Last LSN --> C[Consul KV Store];
    B -- Batch Index Updates --> D[Meilisearch];
    subgraph Monorepo
        E[Main Go Application]
        B
    end

Part 1: Monorepo and Database Preparation

A monorepo simplifies sharing code. Our structure is pragmatic:

/my-project
|-- go.mod
|-- go.work
|-- /cmd
|   |-- /api_server         # Main application service
|   |   `-- main.go
|   `-- /cdc_indexer        # Our new CDC service
|       `-- main.go
|-- /internal
|   |-- /config
|   |-- /logging
|-- /pkg
|   |-- /models             # Shared data structs (e.g., Product)
|   |   `-- product.go
|   `-- /store              # Database access logic

The go.work file is essential for local development, allowing separate modules to coexist seamlessly.

// go.work
go 1.21

use (
    ./cmd/api_server
    ./cmd/cdc_indexer
    ./pkg/models
    ./pkg/store
)

Before writing any Go code, PostgreSQL must be configured for logical replication. This involves setting parameters in postgresql.conf:

# postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10

A restart is required after these changes. Next, we need a specific PUBLICATION for the table we want to watch. Let’s assume a simple products table.

-- Connect to your database as a superuser
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price INT NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- This tells Postgres to publish all changes for the products table.
CREATE PUBLICATION meili_publication FOR TABLE products;

This setup ensures that any Data Manipulation Language (DML) operations on the products table will be sent to the logical replication stream for consumers to read.

Part 2: The Core CDC Listener in Go

The heart of the cdc-indexer service is the connection to the PostgreSQL replication stream. We’ll use the jackc/pgx/v5 library, which has excellent support for the replication protocol.

The main challenge is handling the raw Write-Ahead Log (WAL) messages. They are not plain SQL. We need to receive them, parse them, and act on them. The core logic involves creating a replication connection and entering an infinite loop to process messages.

Here is the initial structure of our replicator service.

// cmd/cdc_indexer/replicator/replicator.go
package replicator

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/jackc/pgx/v5/pgconn"
	"github.com/jackc/pgx/v5/pgproto3"
	"github.com/jackc/pgx/v5/pgtype"
)

// A simplified product model for demonstration.
// In a real project, this would be in the /pkg/models directory.
type Product struct {
	ID          int32     `json:"id"`
	Name        string    `json:"name"`
	Description string    `json:"description"`
	Price       int       `json:"price"`
	UpdatedAt   time.Time `json:"updated_at"`
}

type Replicator struct {
	conn *pgconn.PgConn
	// ... other dependencies like Meilisearch client, Consul client
}

// Logical decoding message types are not exported by pgx, so we define them.
const (
	PGOutputMsgTypeInsert = 'I'
	PGOutputMsgTypeUpdate = 'U'
	PGOutputMsgTypeDelete = 'D'
)

func New(ctx context.Context, replicationConnStr string) (*Replicator, error) {
	conn, err := pgconn.Connect(ctx, replicationConnStr)
	if err != nil {
		return nil, fmt.Errorf("failed to create replication connection: %w", err)
	}
	return &Replicator{conn: conn}, nil
}

func (r *Replicator) Start(ctx context.Context, slotName string, publicationName string, startLSN pglogrepl.LSN) {
	// The plugin_args must match the publication name.
	pluginArgs := fmt.Sprintf(`"proto_version" '1', "publication_names" '%s'`, publicationName)

	// Start the replication stream.
	err := pglogrepl.StartReplication(ctx, r.conn, slotName, startLSN, pglogrepl.StartReplicationOptions{PluginArgs: pluginArgs})
	if err != nil {
		slog.Error("Failed to start replication", "error", err)
		return
	}
	slog.Info("Replication started", "slot", slotName, "start_lsn", startLSN.String())

	standbyMessageTimeout := time.Second * 10
	nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

	for {
		select {
		case <-ctx.Done():
			slog.Info("Context cancelled, stopping replicator.")
			return
		default:
			if time.Now().After(nextStandbyMessageDeadline) {
				// Send a standby status message to Postgres to keep the connection alive.
				err = pglogrepl.SendStandbyStatusUpdate(ctx, r.conn, pglogrepl.StandbyStatusUpdate{WALWrite: startLSN})
				if err != nil {
					slog.Error("Failed to send standby status", "error", err)
					// In a real scenario, you'd have a reconnection logic here.
					return
				}
				nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
				slog.Debug("Sent standby status update")
			}

			// Receive the next message from the stream.
			rawMsg, err := r.conn.ReceiveMessage(ctx)
			if err != nil {
				// Handle timeouts and other potential errors.
				if pgconn.Timeout(err) {
					continue
				}
				slog.Error("ReceiveMessage failed", "error", err)
				return
			}

			// We are only interested in CopyData messages which contain the logical replication data.
			if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
				slog.Error("Received Postgres error", "message", errMsg.Message)
				continue
			}

			msg, ok := rawMsg.(*pgproto3.CopyData)
			if !ok {
				slog.Warn("Received unexpected message type", "type", fmt.Sprintf("%T", rawMsg))
				continue
			}

			switch msg.Data[0] {
			case pglogrepl.PrimaryKeepaliveMessageByteID:
				// This is a keepalive from Postgres. We need to respond.
				pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
				if err != nil {
					slog.Error("Failed to parse primary keepalive", "error", err)
					continue
				}

				if pkm.ReplyRequested {
					nextStandbyMessageDeadline = time.Time{} // Respond immediately
				}

			case pglogrepl.XLogDataByteID:
				// This is the core message containing our data changes.
				xld, err := pglogrepl.ParseXLogData(msg.Data[1:])
				if err != nil {
					slog.Error("Failed to parse XLogData", "error", err)
					continue
				}

				// The WALData contains the actual changes from the pgoutput plugin.
				logicalMsg, err := pglogrepl.Parse(xld.WALData)
				if err != nil {
					slog.Error("Failed to parse logical replication message", "error", err)
					continue
				}
				
				// Process the message and update the LSN for the next state commit.
				if err := r.processLogicalMessage(logicalMsg); err != nil {
				    slog.Error("Failed to process logical message", "error", err)
				}
				
				startLSN = xld.WALStart + pglogrepl.LSN(len(xld.WALData))
			}
		}
	}
}

// processLogicalMessage is a placeholder for now.
// This is where we will parse the actual row data and send it to Meilisearch.
func (r *Replicator) processLogicalMessage(msg pglogrepl.Message) error {
	slog.Info("Processing logical message", "type", msg.Type())
	// ... parsing and batching logic will go here ...
	return nil
}

This code establishes the connection and handles the replication protocol’s handshake and keepalive messages. The real work happens when we receive an XLogData message. Inside it is the payload from the pgoutput plugin, which needs to be parsed to extract the row-level changes. A common pitfall here is not handling the standby status updates correctly, which can cause PostgreSQL to terminate the replication connection.

Part 3: Parsing Changes and Batching for Meilisearch

Indexing documents one by one is horribly inefficient. The network overhead would kill performance. The correct approach is to batch changes. We’ll collect changes in memory for a short period (e.g., 500ms) or until a certain batch size (e.g., 1000 documents) is reached, and then send them to Meilisearch in a single API call.

Let’s expand processLogicalMessage to parse the pgoutput messages and build these batches.

// Extended replicator.go with batching and parsing logic.

// ... (previous code) ...

// Batch represents a collection of changes to be sent to Meilisearch.
type Batch struct {
	Upserts []Product
	Deletes []int32 // Just the IDs
}

type Replicator struct {
	conn         *pgconn.PgConn
	meiliClient  *meilisearch.Client // Assuming meilisearch-go client
	batch        *Batch
	batchTicker  *time.Ticker
	batchMaxSize int
	typeMap      *pgtype.Map
	relations    map[uint32]pglogrepl.RelationMessage
}

func (r *Replicator) processLogicalMessage(msg pglogrepl.Message) error {
	switch m := msg.(type) {
	case *pglogrepl.RelationMessage:
		// Postgres sends this message to describe the schema of a table being replicated.
		// We must store it in a map to decode subsequent row data.
		r.relations[m.RelationID] = *m
		
	case *pglogrepl.InsertMessage:
		relation, ok := r.relations[m.RelationID]
		if !ok {
			slog.Warn("Could not find relation for insert", "relation_id", m.RelationID)
			return nil
		}
		
		product, err := r.decodeTuple(relation, m.Tuple)
		if err != nil {
			return fmt.Errorf("could not decode insert tuple: %w", err)
		}
		r.batch.Upserts = append(r.batch.Upserts, product)
		
	case *pglogrepl.UpdateMessage:
	    // Similar to insert, but you get OldTuple and NewTuple. We only care about the new one for Meilisearch.
		relation, ok := r.relations[m.RelationID]
		if !ok {
			slog.Warn("Could not find relation for update", "relation_id", m.RelationID)
			return nil
		}
		product, err := r.decodeTuple(relation, m.NewTuple)
		if err != nil {
			return fmt.Errorf("could not decode update tuple: %w", err)
		}
		r.batch.Upserts = append(r.batch.Upserts, product)

	case *pglogrepl.DeleteMessage:
	    // For deletes, we only get the primary key of the old tuple.
		relation, ok := r.relations[m.RelationID]
		if !ok {
			slog.Warn("Could not find relation for delete", "relation_id", m.RelationID)
			return nil
		}
		// Assuming 'id' is the first column and primary key. This is a simplification.
		// A robust implementation would parse the key type from the relation message.
		var deletedID int32
		colValue := m.OldTuple.Columns[0]
		if err := r.typeMap.DecodeTo(colValue.Data, &deletedID); err != nil {
		     return fmt.Errorf("could not decode delete tuple id: %w", err)
		}
		r.batch.Deletes = append(r.batch.Deletes, deletedID)

	case *pglogrepl.CommitMessage:
		// A commit message signifies the end of a transaction.
		// This is a good time to check if our batch should be flushed.
		if len(r.batch.Upserts) >= r.batchMaxSize || len(r.batch.Deletes) > 0 {
			r.flushBatch(m.CommitLSN) // Pass the LSN to the flush function
		}
	}
	
	return nil
}

// decodeTuple converts the raw tuple data into our Product struct.
func (r *Replicator) decodeTuple(relation pglogrepl.RelationMessage, tuple *pglogrepl.TupleData) (Product, error) {
    var p Product
    values := map[string]interface{}{}

    for i, col := range tuple.Columns {
        colName := relation.Columns[i].Name
        // Decode binary data from Postgres into the appropriate Go type.
        // The typeMap helps pgx understand how to do this.
        decoded, err := r.typeMap.Decode(relation.Columns[i].DataType, col.Data)
        if err != nil {
            return Product{}, fmt.Errorf("failed to decode column %s: %w", colName, err)
        }
        values[colName] = decoded
    }
    
    // This is a crude mapping. In production, a library like `mapstructure` would be better.
    if id, ok := values["id"].(int32); ok { p.ID = id }
    if name, ok := values["name"].(string); ok { p.Name = name }
    if desc, ok := values["description"].(string); ok { p.Description = desc }
    if price, ok := values["price"].(int32); ok { p.Price = int(price) } // Be careful with type conversions
    if ts, ok := values["updated_at"].(time.Time); ok { p.UpdatedAt = ts }

    return p, nil
}

// flushBatch sends the collected changes to Meilisearch.
func (r *Replicator) flushBatch(commitLSN pglogrepl.LSN) {
	// ... Meilisearch client logic goes here ...
	// ... After success, we MUST commit the LSN to Consul ...
}

The decodeTuple function is where many subtle bugs can hide. It relies on a pgtype.Map to correctly interpret the binary data sent by PostgreSQL. You must ensure the Go types in your struct match the PostgreSQL column types precisely. A mismatch will cause decoding panics. The relations map is also critical; without it, you have no schema information and cannot parse the columns.

Part 4: The Resilience Layer - Consul State Management

This is the most important part of the architecture. The entire fault-tolerance model rests on reliably persisting the last processed LSN.

Our service will follow this startup logic:

  1. Connect to Consul.
  2. Attempt to read the LSN from a predefined key (e.g., cdc-indexer/products/lsn).
  3. If the key exists, parse the LSN and start replication from that point.
  4. If the key doesn’t exist (first run), start replication from the current LSN of the database.

And this processing logic:

  1. Receive and batch WAL messages.
  2. When a batch is ready to be flushed, send it to Meilisearch.
  3. If the Meilisearch update succeeds, immediately write the latest LSN from the batch to the Consul key.

This ensures “at-least-once” delivery. If the service crashes after the Meilisearch update but before the Consul write, it will re-process those messages upon restart. This is acceptable because Meilisearch’s AddOrUpdateDocuments is an idempotent operation. Re-sending the same document will just overwrite it.

Here’s the Go implementation for the Consul state manager and its integration into the replicator.

// cmd/cdc_indexer/state/consul.go
package state

import (
	"context"
	"fmt"

	consul "github.com/hashicorp/consul/api"
	"github.com/jackc/pgx/v5/pglogrepl"
)

type ConsulManager struct {
	client *consul.Client
	key    string
}

func NewConsulManager(addr, key string) (*ConsulManager, error) {
	config := consul.DefaultConfig()
	config.Address = addr
	client, err := consul.NewClient(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create consul client: %w", err)
	}

	return &ConsulManager{client: client, key: key}, nil
}

func (m *ConsulManager) GetLSN(ctx context.Context) (pglogrepl.LSN, error) {
	kvPair, _, err := m.client.KV().Get(m.key, nil)
	if err != nil {
		return 0, fmt.Errorf("failed to get LSN from consul: %w", err)
	}

	if kvPair == nil {
		// Key does not exist, indicates first run.
		return 0, nil
	}

	lsn, err := pglogrepl.ParseLSN(string(kvPair.Value))
	if err != nil {
		return 0, fmt.Errorf("failed to parse LSN from consul value '%s': %w", string(kvPair.Value), err)
	}

	return lsn, nil
}

func (m *ConsulManager) SetLSN(ctx context.Context, lsn pglogrepl.LSN) error {
	p := &consul.KVPair{Key: m.key, Value: []byte(lsn.String())}
	_, err := m.client.KV().Put(p, nil)
	if err != nil {
		return fmt.Errorf("failed to set LSN in consul: %w", err)
	}
	return nil
}

Now, we integrate this into our main.go and the flushBatch method.

// cmd/cdc_indexer/main.go
func main() {
    // ... config loading for postgres, meilisearch, consul addrs ...

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 1. Initialize State Manager
    consulKey := "cdc-indexer/products/lsn"
    stateManager, err := state.NewConsulManager(cfg.ConsulAddr, consulKey)
    // ... handle error ...

    // 2. Get the starting LSN
    startLSN, err := stateManager.GetLSN(ctx)
    // ... handle error ...
    
    if startLSN == 0 {
        slog.Info("No previous LSN found in Consul, starting from current WAL position.")
        // A real implementation would connect to Postgres to get the current LSN.
        // For simplicity, we assume this is handled.
    }

    // 3. Initialize Replicator with all dependencies
    meiliClient := meilisearch.NewClient(...)
    replicator, err := replicator.New(ctx, cfg.ReplicationConnStr, meiliClient, stateManager)
    // ... handle error ...
    
    slog.Info("Starting CDC Indexer...")
    replicator.Start(ctx, "meili_slot", "meili_publication", startLSN)

    // ... graceful shutdown logic ...
}


// In replicator.go, the flushBatch method becomes critical.
func (r *Replicator) flushBatch(commitLSN pglogrepl.LSN) {
    // A real implementation needs more robust error handling and retries.
    if len(r.batch.Upserts) > 0 {
		// _, err := r.meiliClient.Index("products").AddOrUpdateDocuments(r.batch.Upserts)
		// if err != nil { log and crash/retry }
    }
    
    if len(r.batch.Deletes) > 0 {
        // _, err := r.meiliClient.Index("products").DeleteDocuments(r.batch.Deletes)
		// if err != nil { log and crash/retry }
    }

    slog.Info("Successfully flushed batch to Meilisearch", "upserts", len(r.batch.Upserts), "deletes", len(r.batch.Deletes))
    
    // THE MOST IMPORTANT STEP: Commit the LSN to Consul AFTER Meilisearch is updated.
    err := r.stateManager.SetLSN(context.Background(), commitLSN)
    if err != nil {
        // This is a critical failure. If we can't save our state, we must stop.
        // A robust system might have a retry loop or enter a degraded state.
        slog.Error("CRITICAL: Failed to commit LSN to Consul. Shutting down to prevent data loss.", "error", err)
        // os.Exit(1) or trigger a graceful shutdown
        return
    }

    slog.Debug("Committed LSN to Consul", "lsn", commitLSN.String())

    // Reset the batch
    r.batch = &Batch{}
}

The system is now fault-tolerant. We can terminate the cdc-indexer process at any point. Upon restart, it will query Consul for the last successfully processed LSN and resume exactly from that point, guaranteeing that no data changes are permanently lost.

Limitations and Future Iterations

This implementation provides a solid foundation for real-time search indexing, but it is not without its limitations in a large-scale production environment.

First, the “at-least-once” delivery guarantee, while generally acceptable for idempotent search indexing, could cause a higher-than-necessary number of writes to Meilisearch during frequent crash/recovery cycles. Achieving “exactly-once” semantics would require a much more complex transactional outbox pattern, which might be overkill.

Second, the current model uses a single replication slot and a single indexer process, creating a potential performance bottleneck if the source database has an extremely high write throughput. A more advanced architecture might involve partitioning tables across different publications and slots, with multiple cdc-indexer instances running in parallel. This would require a leader election mechanism, for which Consul is also well-suited, to ensure only one instance claims a particular slot at a time.

Finally, the pipeline has no explicit handling for Data Definition Language (DDL) changes. If a column is added to the products table, the decodeTuple function will fail, crashing the service. A production-grade system would need a mechanism to pause replication, alert operators, and handle schema evolution gracefully, possibly by integrating with a schema registry.


  TOC