Managing state for a real-time collaborative application is a path fraught with peril. The initial, naive approach of using centralized state with locks and transactions quickly collapses under the weight of latency, race conditions, and poor offline support. Every keystroke becomes a round-trip to a central authority, creating a bottleneck and a fragile user experience. After grappling with a particularly nasty implementation involving document locking and operational transforms (OT), which resulted in an unmaintainable maze of state machines and conflict resolution logic, a fundamental shift in approach became necessary. The objective was no longer to prevent conflicts but to embrace them and design a system where the state would always converge to a correct, shared view.
This led to exploring Conflict-free Replicated Data Types (CRDTs). The core principle is that operations are commutative and can be applied in any order, guaranteeing eventual consistency without a central coordinator. Our goal was to build a reusable middleware layer that could handle this complex synchronization, plugging seamlessly into our frontend state management and backed by a persistent, auditable log on the server.
The Architectural Blueprint
The technology selection was driven by a need for simplicity on the client, flexibility in the middleware, and powerful data modeling on the backend.
Zustand (Frontend): We needed a state manager that was minimal, unopinionated, and, most importantly, had a robust middleware system. Zustand fits perfectly. Its hook-based API is simple for components to consume, while its middleware capability allows us to intercept every state change and inject our CRDT logic transparently.
Custom WebSocket Middleware (Backend): A dedicated Node.js service, written in TypeScript, would serve as the real-time communication hub. It’s responsible for receiving CRDT operations from clients, validating them, persisting them, and broadcasting them to other participants in a session. This is the “distributed middleware” component of our stack.
ArangoDB (Persistence): This is a deliberate and somewhat unconventional choice. While a simple append-only log in a key-value store would work, ArangoDB’s multi-model nature offers unique advantages. We can store the CRDT operations as documents but also model the relationships between them as a graph. Each operation can be a vertex linked to the user who created it, the document it belongs to, and its causal predecessors. This structure enables powerful queries for auditing, visualizing revision history, and even implementing complex permission systems on sub-sections of a document—something that’s notoriously difficult with a flat log.
TypeScript (Full-Stack): In a system where data structures are this complex and precise, TypeScript is non-negotiable. Sharing type definitions for CRDT operations between the client, the middleware, and the persistence layer eliminates an entire class of integration bugs.
The overall data flow looks like this:
sequenceDiagram participant ClientA as Client A (Zustand) participant Middleware as WebSocket Middleware participant ArangoDB participant ClientB as Client B (Zustand) ClientA->>+Middleware: User types 'H'. Local CRDT op generated & sent. Middleware->>+ArangoDB: Persist 'Insert("H")' operation. ArangoDB-->>-Middleware: Acknowledge persistence. Middleware->>ClientB: Broadcast 'Insert("H")' operation. Middleware-->>-ClientA: Acknowledge receipt. ClientB->>+Middleware: User types 'i'. Local CRDT op generated & sent. Middleware->>+ArangoDB: Persist 'Insert("i")' operation. ArangoDB-->>-Middleware: Acknowledge persistence. Middleware->>ClientA: Broadcast 'Insert("i")' operation. Middleware-->>-ClientB: Acknowledge receipt. Note over ClientA, ClientB: Both clients apply received operations. Note over ClientA, ClientB: Due to CRDT properties, state converges to "Hi".
Implementing the Core CRDT in TypeScript
For our text collaboration scenario, we’ll implement a simplified version of a Replicated Growable Array (RGA), a common sequence CRDT. In an RGA, each character is an object with a unique ID and pointers to its predecessor and successor. When a character is deleted, it’s marked with a “tombstone” rather than being removed, which is crucial for resolving insertion conflicts correctly.
First, let’s define the core types. These will be shared across the entire stack.
// src/common/crdt-types.ts
/**
* A unique identifier for each character in the sequence.
* Composed of a timestamp and a site ID to ensure global uniqueness.
*/
export interface CharId {
timestamp: number;
siteId: string;
}
/**
* Represents a single character in our distributed sequence.
* It contains the character itself, its unique ID, and the ID of the
* character it was inserted after (its causal predecessor).
* A tombstone flag marks it as deleted.
*/
export interface RGAChar {
id: CharId;
value: string;
leftId: CharId | null; // null for the beginning of the document
isTombstone: boolean;
}
/**
* Represents an insertion operation. This is the message format
* that will be sent over the wire and stored in the database.
*/
export interface InsertOp {
type: 'insert';
char: RGAChar;
}
/**
* Represents a deletion operation. We only need the ID of the
* character to be marked as a tombstone.
*/
export interface DeleteOp {
type: 'delete';
charId: CharId;
}
export type CRDTOperation = InsertOp | DeleteOp;
// Helper function to compare CharIds. Necessary for sorting.
export const compareCharIds = (id1: CharId | null, id2: CharId | null): number => {
if (id1 === null && id2 === null) return 0;
if (id1 === null) return -1;
if (id2 === null) return 1;
if (id1.timestamp < id2.timestamp) return -1;
if (id1.timestamp > id2.timestamp) return 1;
// Timestamps are equal, use siteId as a tie-breaker
if (id1.siteId < id2.siteId) return -1;
if (id1.siteId > id2.siteId) return 1;
return 0;
};
This compareCharIds
function is critical. In a distributed system, clocks are not perfectly synchronized. If two users insert a character at the same position at roughly the same time, their timestamps might be identical. The siteId
(a unique identifier for each client session) acts as a deterministic tie-breaker, ensuring that all clients will order these concurrent insertions identically, leading to convergence.
Now, let’s create a class to manage the CRDT state. This class will live on the client and will be used by our Zustand middleware.
// src/client/crdt-document.ts
import { CharId, RGAChar, InsertOp, DeleteOp, compareCharIds } from '../common/crdt-types';
export class CRDTDocument {
private siteId: string;
// A map is more efficient for lookups than an array
private chars: Map<string, RGAChar> = new Map();
constructor(siteId: string) {
if (!siteId) {
throw new Error("CRDTDocument requires a unique siteId.");
}
this.siteId = siteId;
}
// A utility to serialize CharId to a string for map keys
private static charIdToString(id: CharId): string {
return `${id.timestamp}-${id.siteId}`;
}
/**
* Generates a new unique CharId for an operation.
* In a real-world project, this should use a Lamport timestamp or a
* similar logical clock, but for simplicity, we use Date.now().
* The risk with Date.now() is clock skew, but the siteId tie-breaker mitigates it.
*/
private generateCharId(): CharId {
return {
timestamp: Date.now(),
siteId: this.siteId,
};
}
/**
* Processes a local insertion (e.g., user typing).
* It finds the character at the target index, generates a new character
* with a unique ID, and returns the corresponding InsertOp to be broadcast.
* @param index The linear index where the character is inserted.
* @param value The character to insert.
* @returns The generated InsertOp.
*/
public localInsert(index: number, value: string): InsertOp {
const visibleChars = this.getVisibleChars();
const leftChar = index > 0 ? visibleChars[index - 1] : null;
const leftId = leftChar ? leftChar.id : null;
const newChar: RGAChar = {
id: this.generateCharId(),
value,
leftId,
isTombstone: false,
};
// Apply the change locally immediately for responsiveness
this.applyInsert(newChar);
return { type: 'insert', char: newChar };
}
/**
* Processes a local deletion.
* It finds the character at the target index and returns the DeleteOp.
* @param index The linear index of the character to delete.
* @returns The generated DeleteOp or null if invalid index.
*/
public localDelete(index: number): DeleteOp | null {
const visibleChars = this.getVisibleChars();
if (index < 0 || index >= visibleChars.length) {
return null;
}
const charToDelete = visibleChars[index];
this.applyDelete(charToDelete.id);
return { type: 'delete', charId: charToDelete.id };
}
/**
* Applies an insertion operation. This method is called both for
* local changes and for remote operations received from the server.
* It's idempotent; applying the same operation multiple times has no effect.
*/
public applyInsert(char: RGAChar): void {
const key = CRDTDocument.charIdToString(char.id);
if (!this.chars.has(key)) {
this.chars.set(key, char);
}
}
/**
* Applies a deletion operation by marking the character's tombstone flag.
*/
public applyDelete(charId: CharId): void {
const key = CRDTDocument.charIdToString(charId);
const char = this.chars.get(key);
if (char) {
char.isTombstone = true;
}
}
/**
* Reconstructs the visible document string from the CRDT data structure.
* This is the core of the RGA algorithm: sorting the characters based
* on their IDs to handle concurrent edits correctly.
*/
public toString(): string {
return this.getVisibleChars().map(c => c.value).join('');
}
/**
* Gets the ordered list of non-tombstoned characters.
* This can be computationally expensive on large documents and is a
* key area for optimization in production systems.
*/
private getVisibleChars(): RGAChar[] {
// This is a naive and inefficient implementation of sequence reconstruction.
// A production-grade implementation would use a more optimized data structure
// (like a skip list or a balanced binary tree) to avoid re-sorting on every render.
const allChars = Array.from(this.chars.values());
const visible = allChars.filter(c => !c.isTombstone);
// The sort is crucial. It ensures that all clients render the same sequence.
visible.sort((a, b) => compareCharIds(a.id, b.id));
// The above sort is WRONG for RGA. RGA requires a topological sort based on `leftId`.
// A correct implementation is much more complex. For this article, we will
// simulate it with a simple but inefficient approach.
const charMap = new Map<string | null, RGAChar[]>();
for (const char of visible) {
const leftKey = char.leftId ? CRDTDocument.charIdToString(char.leftId) : null;
if (!charMap.has(leftKey)) {
charMap.set(leftKey, []);
}
charMap.get(leftKey)!.push(char);
}
// Sort concurrent inserts at the same position
charMap.forEach(children => children.sort((a, b) => compareCharIds(a.id, b.id)));
const result: RGAChar[] = [];
let currentKey: string | null = null;
const queue: RGAChar[] = [...(charMap.get(null) || [])];
while(queue.length > 0) {
const char = queue.shift()!;
result.push(char);
const children = charMap.get(CRDTDocument.charIdToString(char.id)) || [];
queue.unshift(...children.reverse()); // Prepend children to process them next
}
return result;
}
}
A critical note here: the getVisibleChars
method is a simplified placeholder. A correct and performant RGA implementation requires a topological sort of the character graph, which is computationally intensive. Real-world libraries use optimized data structures to maintain the linear order without full resorting on every change. However, this simplified version demonstrates the core concepts.
The Zustand Middleware for CRDT Synchronization
With the CRDT logic encapsulated, we can create our Zustand middleware. This middleware will act as the bridge between the UI components, the local CRDT state, and the WebSocket connection to the server.
// src/client/crdt-middleware.ts
import { StateCreator, StoreMutatorIdentifier } from 'zustand';
import { CRDTDocument } from './crdt-document';
import { CRDTOperation } from '../common/crdt-types';
// Define the shape of our state
interface CRDTState {
doc: CRDTDocument;
text: string; // The materialized string for the UI to consume
isConnected: boolean;
// Actions that components can call
localInsert: (index: number, value: string) => void;
localDelete: (index: number) => void;
// Internal action for server updates
_applyRemoteOp: (op: CRDTOperation) => void;
_setConnected: (status: boolean) => void;
}
type CRDTMiddleware = <
T extends CRDTState,
Mps extends [StoreMutatorIdentifier, unknown][] = [],
Mcs extends [StoreMutatorIdentifier, unknown][] = []
>(
f: StateCreator<T, Mps, Mcs>,
// Middleware options
options: {
siteId: string;
// The function to send an operation to the server
sendOperation: (op: CRDTOperation) => void;
}
) => StateCreator<T, Mps, Mcs>;
// Define the middleware implementation
const crdtMiddlewareImpl: CRDTMiddleware = (f, options) => (set, get, api) => {
const { siteId, sendOperation } = options;
const initialDoc = new CRDTDocument(siteId);
// Helper to update state and text representation
const updateState = (doc: CRDTDocument) => {
set({ doc, text: doc.toString() } as Partial<CRDTState>);
};
const enhancedStateCreator: StateCreator<CRDTState, [], []> = (set, get) => ({
doc: initialDoc,
text: '',
isConnected: false,
localInsert: (index, value) => {
const doc = get().doc;
const op = doc.localInsert(index, value);
// Immediately update local state for a snappy UI
updateState(doc);
// Send the operation to the server for broadcast
sendOperation(op);
},
localDelete: (index) => {
const doc = get().doc;
const op = doc.localDelete(index);
if (op) {
updateState(doc);
sendOperation(op);
}
},
_applyRemoteOp: (op) => {
// This is a critical section. A common mistake is to create a new
// doc instance here, losing the state. We must mutate the existing one.
const doc = get().doc;
if (op.type === 'insert') {
doc.applyInsert(op.char);
} else if (op.type === 'delete') {
doc.applyDelete(op.charId);
}
// Re-render the text with the new, converged state
updateState(doc);
},
_setConnected: (status: boolean) => {
set({ isConnected: status });
}
});
return f(enhancedStateCreator as any, set as any, api) as any;
};
// Export as a named middleware for Zustand
export const crdt = crdtMiddlewareImpl as unknown as CRDTMiddleware;
To use this, we create our store. The WebSocket logic is kept separate for clarity.
// src/client/store.ts
import create from 'zustand';
import { crdt } from './crdt-middleware';
import { CRDTOperation } from '../common/crdt-types';
import { v4 as uuidv4 } from 'uuid'; // For generating a unique siteId
const siteId = uuidv4();
let socket: WebSocket;
// The store definition is clean. The complexity is hidden in the middleware.
export const useStore = create<CRDTState>()(
crdt(
(set, get) => ({
/* Initial state and actions are defined in the middleware */
}),
{
siteId,
sendOperation: (op: CRDTOperation) => {
// A real-world implementation should handle connection state,
// queuing operations if offline.
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(op));
} else {
console.error("Socket not connected. Operation lost.", op);
}
},
}
)
);
// Initialize WebSocket connection and wire it up to the store
export function connectToServer(docId: string) {
socket = new WebSocket(`ws://localhost:8080/doc/${docId}`);
socket.onopen = () => {
console.log("Connected to server.");
useStore.getState()._setConnected(true);
};
socket.onmessage = (event) => {
try {
const op: CRDTOperation = JSON.parse(event.data);
// Feed incoming remote operations into the store
useStore.getState()._applyRemoteOp(op);
} catch (e) {
console.error("Failed to parse incoming operation:", e);
}
};
socket.onclose = () => {
console.log("Disconnected from server.");
useStore.getState()._setConnected(false);
};
socket.onerror = (error) => {
console.error("WebSocket error:", error);
useStore.getState()._setConnected(false);
};
}
The beauty of this pattern is that UI components remain completely unaware of CRDTs or WebSockets. They just call useStore().localInsert(index, char)
and the state updates automatically.
The Backend Service and ArangoDB Persistence
The backend acts as a simple broadcaster and a persistence layer. We’ll use the ws
library for WebSockets and the arangojs
driver for database interaction.
First, the server setup and WebSocket handling:
// src/server/index.ts
import { WebSocketServer, WebSocket } from 'ws';
import { ArangoDBService } from './arangodb-service';
import { CRDTOperation } from '../common/crdt-types';
const PORT = 8080;
// This map holds the set of connected clients for each document room.
const docRooms = new Map<string, Set<WebSocket>>();
async function main() {
const dbService = new ArangoDBService();
await dbService.initialize();
const wss = new WebSocketServer({ port: PORT });
wss.on('connection', (ws, req) => {
const docId = req.url?.split('/').pop();
if (!docId) {
console.error("Connection attempt without docId. Closing.");
ws.close();
return;
}
// Add client to the document room
if (!docRooms.has(docId)) {
docRooms.set(docId, new Set());
}
docRooms.get(docId)!.add(ws);
console.log(`Client connected to doc ${docId}. Total clients: ${docRooms.get(docId)!.size}`);
ws.on('message', async (message) => {
try {
const op: CRDTOperation = JSON.parse(message.toString());
// Persist the operation before broadcasting. This is a critical step
// for data durability. If the server crashes after broadcasting but
// before persisting, the operation is lost forever.
await dbService.saveOperation(docId, op);
// Broadcast the operation to all other clients in the same room
const room = docRooms.get(docId);
if (room) {
for (const client of room) {
// A common mistake is broadcasting back to the sender.
// This can cause loops or duplicate state updates if the client
// isn't designed to handle it. Here, we broadcast to everyone,
// assuming the client's CRDT logic is idempotent.
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(op));
}
}
}
} catch (e) {
console.error("Failed to process message:", e);
}
});
ws.on('close', () => {
// Remove client from the room
const room = docRooms.get(docId);
if (room) {
room.delete(ws);
console.log(`Client disconnected from doc ${docId}. Remaining: ${room.size}`);
if (room.size === 0) {
docRooms.delete(docId);
}
}
});
ws.on('error', (err) => {
console.error(`WebSocket error for client in doc ${docId}:`, err);
});
});
console.log(`WebSocket server started on ws://localhost:${PORT}`);
}
main().catch(console.error);
Now for the interesting part: the ArangoDB service. We’ll set up collections for documents
, operations
, and an edge collection hasOperation
to link them.
// src/server/arangodb-service.ts
import { Database } from 'arangojs';
import { aql } from 'arangojs/aql';
import { CRDTOperation } from '../common/crdt-types';
export class ArangoDBService {
private db: Database;
constructor() {
// Configuration should come from environment variables in a real project
this.db = new Database({
url: process.env.ARANGO_URL || "http://localhost:8529",
databaseName: process.env.ARANGO_DB || "crdt_collab",
auth: { username: process.env.ARANGO_USER || "root", password: process.env.ARANGO_PASSWORD || "open-sesame" },
});
}
public async initialize(): Promise<void> {
try {
const dbExists = await this.db.exists();
if(!dbExists) {
await this.db.createDatabase('crdt_collab');
}
const opCollection = this.db.collection('operations');
if (!(await opCollection.exists())) {
console.log("Creating 'operations' collection...");
await opCollection.create();
// Add a hash index on the operation ID for efficient lookups.
// This is crucial to prevent duplicate operations from being inserted.
await opCollection.ensureIndex({ type: 'hash', fields: ['opId'] });
}
const docCollection = this.db.collection('documents');
if (!(await docCollection.exists())) {
console.log("Creating 'documents' collection...");
await docCollection.create();
}
const edgeCollection = this.db.edgeCollection('hasOperation');
if (!(await edgeCollection.exists())) {
console.log("Creating 'hasOperation' edge collection...");
await edgeCollection.create();
}
console.log("ArangoDB initialized successfully.");
} catch (e) {
console.error("Failed to initialize ArangoDB:", e);
throw e;
}
}
/**
* Saves a CRDT operation and links it to its document.
* The graph model allows us to easily query the entire history of a document.
*/
public async saveOperation(docId: string, op: CRDTOperation): Promise<void> {
// Generate a unique, deterministic ID for the operation. This is key
// for idempotency. If we receive the same operation twice, we won't store it twice.
const opId = this.createOperationId(op);
const operations = this.db.collection('operations');
const hasOperation = this.db.edgeCollection('hasOperation');
// In ArangoDB, we can use an AQL query in a transaction to insert the
// operation and the edge conditionally, ensuring atomicity.
const query = aql`
LET opKey = ${opId}
LET opVertex = FIRST(
UPSERT { _key: opKey }
INSERT { _key: opKey, opId: opKey, operation: ${op} }
UPDATE {} IN ${operations}
RETURN NEW
)
LET docVertex = FIRST(
UPSERT { _key: ${docId} }
INSERT { _key: ${docId}, createdAt: DATE_NOW() }
UPDATE {} IN documents
RETURN NEW
)
UPSERT { _from: docVertex._id, _to: opVertex._id }
INSERT { _from: docVertex._id, _to: opVertex._id }
UPDATE {} IN ${hasOperation}
RETURN opVertex
`;
try {
await this.db.query(query);
} catch (err: any) {
// Handle potential unique key violations gracefully
if (err.isArangoError && err.errorNum === 1210) {
console.warn(`Attempted to insert duplicate operation: ${opId}`);
} else {
console.error(`Failed to save operation for doc ${docId}:`, err);
throw err;
}
}
}
private createOperationId(op: CRDTOperation): string {
// A simple but effective way to create a deterministic ID.
// In production, a more robust hashing algorithm like SHA-256 would be better.
if(op.type === 'insert') {
const { timestamp, siteId } = op.char.id;
return `${timestamp}-${siteId}`;
} else {
const { timestamp, siteId } = op.charId;
return `del-${timestamp}-${siteId}`;
}
}
}
The AQL query above is doing several things atomically:
-
UPSERT
the operation into theoperations
collection using our deterministic key. If it already exists, it does nothing (UPDATE {}
). -
UPSERT
the document vertex. If it’s the first operation for this document, it creates the document vertex. -
UPSERT
the edge connecting the document to the operation.
This transactional approach guarantees that our operation log is consistent. The pitfall here is performance. Running a complex AQL query for every single keystroke could become a bottleneck. In a high-throughput system, we would likely batch operations and write them in bulk.
Limitations and Future Trajectories
This implementation provides a solid foundation for a real-time collaborative system, but in a production environment, several challenges remain.
First, the CRDT implementation is naive. The getVisibleChars
method’s performance degrades with document size and edit history. Production-grade CRDTs like Y.js or Automerge use highly optimized data structures (like skip lists, ropes, and compressed run-length encoding for tombstones) to maintain high performance. Garbage collecting tombstones without breaking the CRDT’s convergence guarantees is also a non-trivial problem that we haven’t addressed.
Second, the WebSocket middleware is a single server and thus a single point of failure and a scalability bottleneck. A robust architecture would replace the in-memory docRooms
map with a distributed pub/sub system like Redis Streams or NATS. This would allow us to run multiple stateless middleware instances behind a load balancer.
Third, the ArangoDB model, while powerful for querying, is not optimized for document reconstruction. On loading a document, a client would need to fetch all operations and replay them, which is slow for large documents. A common optimization is to periodically create snapshots of the document’s materialized state and store them alongside the op-log. A new client would fetch the latest snapshot and only the operations that occurred after it was created.
Finally, we haven’t touched upon user presence (cursors, selection highlights), which adds another layer of real-time state to manage. This is typically handled outside the core CRDT document model, often through ephemeral messages broadcast via the same WebSocket infrastructure but not persisted in the database.